RxJava2 Android 中使用


什么是RxJava

实现异步操作的库

定义

RxJava 是基于事件流的、实现异步操作的库

作用

实现异步操作

使用

添加依赖

    implementation "io.reactivex.rxjava2:rxjava:2.2.6"
    implementation "io.reactivex.rxjava2:rxandroid:2.1.0"

使用一次订阅并打印出来

  • 需要 Observable.fromArray 方法

fromArray用来创建一个Observable(被观察者)对象,可以将一个数组转化为可被观察的序列并且将它的数据逐个发送。

返回值:Observable

  • 需要 subscribe 方法

subscribe 只用于连接被观察者和观察者”“()”圆括号内为观察者”

  • 需要 Consumber 类作为参数

当你只关心观察者的onNext方法时可以使用Consumer类

  • 需要 Subscriber.onNext() 方法

被观察者执行Subscriber.onNext()方法时会在 观察者 订阅时复写该方法来进行发送数据

启动它即可

        Observable.fromArray("Ted", "Ryan", "Billy")
            .subscribe { onNext -> println("name: $onNext") 

观察者打印了字符

Android Kotlin - RxJava Intro

刷新一次界面控件

  • 需要 Obserbable.subscribeOn()

指定Observable(被观察者)自身在哪个调度器上执行

  • 需要 Schedule() 线程控制器

作用:指定每一段代码在什么样的线程中执行

  • 需要 filer() 过滤操作符号

作用:输出过滤条件后的结果项。

  • 需要 Obserbable.observeOn() 方法

指定 Subscribe 所运行在的线程。或者事件消费的线程。

指定一个观察者在哪个调度器上观察这个Observable。

observeOn()可以多次使用,可以随意变换线程

  • 使用 AndroidSchedulers.mainThread() 方法

切换至主线程

        Observable.fromArray("Ted", "Ryan", "Billy")
            .subscribeOn(Schedulers.newThread())
            .filter { item -> item == "Ted" }
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { onNext -> textView.text = onNext }

执行一次网络访问

  • 使用 Observable.create() 方法

作用:创建一个被观察者。

可以通过泛型指定发送数据的类型

参数:复写 ObservableOnSubscribe 类型,当被观察者被订阅时会执行 subscribe() 中的事件。

  • 需要 onError() 方法

被观察者执行Subscriber.onError()方法时会在 观察者 订阅时复写该方法来进行发送数据

参数: 发送的泛型数据类型

  • 需要 onComplate() 方法

被观察者执行Subscriber.onComplate()方法时会在 观察者 订阅时复写该方法来进行发送数据

参数: 发送的泛型数据类型

创建 方法 getTextFromNetwork ()

创建 Observable (被观察者)处理完成后发送事件,subscribeOn 让 subscribe 到一个新的线程中执行,observeOn 切换观察者的线程到主线程刷新UI,并不断接受被观察者发送来的事件。这是一个异步的操作。

    fun getTextFromNetwork() {
        val task = Observable.create<String> { subscriber ->
            try {
                subscriber.onNext("网络访问")
            } catch (e: Exception) {
                subscriber.onError(e)
            }
            subscriber.onComplete()
        }
        task.subscribeOn(Schedulers.newThread())
        task.observeOn(AndroidSchedulers.mainThread())
            .subscribe { onNext -> textView.text = onNext }
    }

在调用 MainActivity

补充

Observer 接口

        val observer = object: Observable<String>() {
            //观察者接收事件前,默认最先调用复写 onSubscribe()
            override fun subscribeActual(observer: Observer<in String>?) {
                TODO("Not yet implemented")
            }

        }

Subscriber 接口

Subscriber 接口对 Observer 接口进行了扩展

        val subscribe= object : Subscriber<String>{
            //被观察者调用 onSubscribe 时发送事件,观察者会调用此方法进行响应接受此事件
            override fun onSubscribe(s: Subscription?) {
                TODO("Not yet implemented")
            }
            //被观察者调用 onNext 时发送事件,观察者会调用此方法进行响应接受此事件
            override fun onNext(t: String?) {
                TODO("Not yet implemented")
            }
            //被观察者调用 onError 时发送事件,观察者会调用此方法进行响应接受此事件
            override fun onError(t: Throwable?) {
                TODO("Not yet implemented")
            }
            //被观察者调用 onComplete 时发送事件,观察者会调用此方法进行响应接受此事件
            override fun onComplete() {
                TODO("Not yet implemented")
            }

        }

Single与SingleObserver

什么情况下使用 Single。

如果你使用一个单一的连续事件流,既然只有一个onNext()事件,接着就触发onComplete或者onError,这样你使用Single。

Single共包含那些

一个正常处理成功的onSuccess,另一个处理失败的onError,当然它之发送一次信息,其中Single类似于Observable。

被观察者

        val single: Single<String> = Single.create<String>(object : SingleOnSubscribe<String> {
            override fun subscribe(emitter: SingleEmitter<String>) {
                emitter.onSuccess("t")
                emitter.onSuccess("c")
                // 连续发送两次是不能成功的。
            }
        })

观察者

        single.subscribe(object : SingleObserver<String> {
            // 在被观察者调用 onSubscribe 后,观察者回调到这里。
            override fun onSubscribe(d: Disposable) {
            }
            // 在被观察者调用 onSuccess 后,观察者回调到这里。
            override fun onSuccess(t: String) {
                tv.text = t
            }
            // 在被观察者调用 onError 后,观察者回调到这里。
            override fun onError(e: Throwable) {
            }

        })

并没有按照预期变成“c”

所有 single只适合单次事件流。

RxJava中关于 Disposable

Disposable 类

  • dispose()

主动解除订阅

  • isDisposed()

查询是否解除订阅 。(true 代表解除)

什么是同步

在执行功能前必须一件一件做完才能进行下一步。

什么是异步

与同步是相对的,在我们执行完某个功能之后,我们并不需要立刻得到结果,我们可以正确的做其他的操作,这个功能可以在完成后通知或者回调告诉我们;

例如:后台下载的例子中,在执行下载功能后,我们无需关心它的下载过程,在它下载完毕之后通知我们就可以了。

观察者模式

观察者模式

当对象间存在一对多的关系时,则使用观察者模式(Observer Patterm)。 例如:当前对象被修改时这会通知依赖它的对象。观察者模式属于行为型模式。

介绍

意图:定义对象之间的一种对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并且被自动更新。

主要解决:一个对象状态改变给其他对象通知的问题,而且要考虑到易用性和低耦合,保证高度的协作。

何时使用:一个对象(目标对象)的状态发生改变,所有的依赖对象(观察者对象)都将得到通知,进行广播通知。

如何解决当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。比如,当一个对象被修改时,则会自动通知依赖它的对象。观察者模式属于行为型模式。

如何解决:使用面向对象技术,可以将这种关系弱化。

关键代码:在抽象类中有一个ArrayList存放观察者。

应用实例

  1. 拍卖时,拍卖师观察最高标价,然后通知其他竞价者竞价。
  2. 菩萨通过洒水招来老乌龟,老乌龟就是观察者,观察菩萨的洒水动作

优点

  1. 观察者和被观察者是抽象耦合的。
  2. 建立一套触发机制

缺点

  1. 如果一个被观察者对象有很多直接和间接的观察者的话,将所有的观察者都通知到会花费很多时间。
  2. 如果观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能导致系统崩溃。
  3. 观察者模式没有相应的机制让观察者知道所观察的目标对象是怎么发生变化的,而仅仅知道观察目标发送变化。

使用场景

一个抽象模型有两个方面,其中一个方面依赖于另一个方面。将这些方面封装到独立的对象中使它们可以各自独立的改变和复用。

一个对象的改变将导致一个或多个对象也发生改变,而不知道有多少对象改变,可以降低对象之间的耦合度。

一个对象必须通知其他对象,而不知道这个对象时谁。

需要在系统中创建一个触发链,A对象的行为将影响B对象,B对象的行为将影响C对象……。可以使用观察者模式创建一种链式触发机制。

谁是观察者谁是被观察者?

当你做出了某些动作另一个人根据你的动作做出某些行为,这时你就是被观察者,另一个人是观察者。

订阅-发布模式

订阅-发布模式是观察者模式的另一个别称。

但是随着时间的变化,已经独立于观察者模式,成为另一种的设计模式。

在现在的发布订阅模式中,成为发布者的消息发送者不会将信息直接发送给订阅者,这意味这发布者和订阅者不知道彼此的存在。在发布者和订阅者之间存在第三个组件,称为调度中心或事件通道,它维持着发布者和订阅者之间的联系,过滤所有发布者传入的信息并相应的分发给它的订阅者。

例子:

你在微博关注了A,同时其他很多人也关注了A,那么当A发布动态的时候,微博就会为你推送这条动态。A就是发布者,你是订阅者,微博就是调度中心,你和A之间没有直接的消息往来,全是通过微博来协调的。

如果有些方法没有详细说明你可以自行搜索查看

参考

观察者模式与订阅发布模式的区别

RxJava 2.x 使用详解(一) 快速入门

Rxjava关于Disposable你应该知道的事


文章作者: TheCara
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC 4.0 许可协议。转载请注明来源 TheCara !
  目录