package com.lvyuetravel.util.rxutil;

import androidx.annotation.NonNull;
import com.lvyuetravel.base.MvpBasePresenter;
import com.lvyuetravel.util.LogUtils;
import rx.Emitter;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

/* loaded from: classes2.dex */
public class RxUtils {
    public static <R> Subscription executeAsyncTask(@NonNull RxAsyncTask<R> rxAsyncTask) {
        return executeAsyncTask(rxAsyncTask, new RxThrowableAction());
    }

    public static <R> Subscription executeAsyncTask(@NonNull RxAsyncTask<R> rxAsyncTask, @NonNull Action1<Throwable> action1) {
        return Observable.create(getRxAsyncTaskOnSubscribe(rxAsyncTask), Emitter.BackpressureMode.LATEST).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() { // from class: com.lvyuetravel.util.rxutil.a
            @Override // rx.functions.Action1
            public final void call(Object obj) {
                r1.doInUIThread(((RxAsyncTask) obj).getOutData());
            }
        }, action1);
    }

    @NonNull
    private static <R> RxTaskOnSubscribe<RxAsyncTask<R>> getRxAsyncTaskOnSubscribe(@NonNull RxAsyncTask<R> rxAsyncTask) {
        return new RxTaskOnSubscribe<RxAsyncTask<R>>(rxAsyncTask) { // from class: com.lvyuetravel.util.rxutil.RxUtils.1
            @Override // rx.functions.Action1
            public void call(Emitter<RxAsyncTask<R>> emitter) {
                RxAsyncTask<R> task = getTask();
                task.setOutData(task.doInIOThread());
                emitter.onNext(task);
                emitter.onCompleted();
            }
        };
    }

    public static <T> void request(MvpBasePresenter mvpBasePresenter, Observable<T> observable, RxCallback<T> rxCallback) {
        if (mvpBasePresenter == null || rxCallback == null) {
            LogUtils.e("可取消订阅的网络请求中必须要有callBack回调的存在");
        } else {
            mvpBasePresenter.addSubscription(observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe((Subscriber) rxCallback));
        }
    }

    public static <T> void request(MvpBasePresenter mvpBasePresenter, Observable<T> observable, RxHighCallback<T> rxHighCallback) {
        if (mvpBasePresenter == null || rxHighCallback == null) {
            LogUtils.e("可取消订阅的网络请求中必须要有callBack回调的存在");
        } else {
            mvpBasePresenter.addSubscription(observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe((Subscriber) rxHighCallback));
        }
    }
}
