package com.taobao.message.eventengine.core;

import android.taobao.windvane.thread.WVThreadFactory$$ExternalSyntheticOutline0;
import com.alibaba.ariver.app.api.AppRestartResult$$ExternalSyntheticOutline0;
import com.alibaba.fastjson.JSON;
import com.taobao.message.eventengine.event.EventRepository;
import com.taobao.message.eventengine.event.EventRepositoryImpl;
import com.taobao.message.eventengine.report.UploadManager;
import com.taobao.message.kit.apmmonitor.toolbox.RandomUtil;
import com.taobao.message.kit.config.ConfigCenterManager;
import com.taobao.message.kit.threadpool.SaturativeExecutor;
import com.taobao.message.kit.util.Env;
import com.taobao.message.kit.util.MessageLog;
import com.taobao.message.kit.util.TextUtils;
import com.taobao.message.kit.util.TimeStamp;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.schedulers.ExecutorScheduler;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: classes9.dex */
public class EventDispatcher {
    private static final int QUEUE_CAPACITY = 4096;
    private static final String TAG = "EventDispatcher";
    private static SaturativeExecutor mDispatchExecutor;
    private EventRepository mEventRepository;
    private String mIdentifier;
    private UploadManager mUploadManager;
    private Subject<Object> mPublish = new PublishSubject().toSerialized();
    private CompositeDisposable mCompositeDisposable = new CompositeDisposable();
    private Map<String, List<ClientEvent>> mBuffer = new HashMap();
    private DispatcherHistory mDispatcherHistory = new DispatcherHistory();

    /* loaded from: classes9.dex */
    public static final class DirectCommitStrategy extends Strategy {
    }

    /* loaded from: classes9.dex */
    public static class Strategy {
        public long bufferTime = 0;
        public long bufferCount = 100;
        public boolean stable = false;
        public String mergeTopicKey = null;
    }

    static {
        SaturativeExecutor saturativeExecutor = new SaturativeExecutor(1, 1, 4096);
        mDispatchExecutor = saturativeExecutor;
        saturativeExecutor.setThreadFactory(new ThreadFactory() { // from class: com.taobao.message.eventengine.core.EventDispatcher.1
            private final AtomicInteger mCount = new AtomicInteger(1);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                String m = WVThreadFactory$$ExternalSyntheticOutline0.m(this.mCount, AppRestartResult$$ExternalSyntheticOutline0.m("MessageEvent-"));
                Thread thread = new Thread(runnable, m);
                MessageLog.d(EventDispatcher.TAG, "Spawning ", m);
                return thread;
            }
        });
        mDispatchExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy() { // from class: com.taobao.message.eventengine.core.EventDispatcher.2
            @Override // java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy, java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                super.rejectedExecution(runnable, threadPoolExecutor);
                MessageLog.e(EventDispatcher.TAG, "rejectedExecution task");
            }
        });
        mDispatchExecutor.allowCoreThreadTimeOut(true);
    }

    public EventDispatcher(String str) {
        this.mIdentifier = str;
        this.mEventRepository = new EventRepositoryImpl(str);
        this.mUploadManager = new UploadManager(this.mIdentifier);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void action(final String str, final Strategy strategy, List<ClientEvent> list) {
        final List<ClientEvent> list2;
        EventRepository eventRepository;
        if (Env.isDebug()) {
            MessageLog.d(TAG, "AcionEvent|", str, "|", Integer.valueOf(list.size()), "|", JSON.toJSONString(list), "|", JSON.toJSONString(strategy));
        } else {
            MessageLog.e(TAG, "AcionEvent|", str, "|", Integer.valueOf(list.size()), "|", JSON.toJSONString(strategy));
        }
        if (strategy instanceof DirectCommitStrategy) {
            this.mCompositeDisposable.add(this.mUploadManager.upload(list).subscribe(new Consumer<List<ClientEvent>>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.10
                @Override // io.reactivex.functions.Consumer
                public void accept(List<ClientEvent> list3) {
                    if (Env.isDebug()) {
                        MessageLog.d(EventDispatcher.TAG, "UploadSuccess|", str, "|", Integer.valueOf(list3.size()), "|", JSON.toJSONString(list3));
                    } else {
                        MessageLog.e(EventDispatcher.TAG, "UploadSuccess|", str, "|", Integer.valueOf(list3.size()));
                    }
                }
            }, new Consumer<Throwable>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.11
                @Override // io.reactivex.functions.Consumer
                public void accept(Throwable th) throws Exception {
                    StringBuilder m = AppRestartResult$$ExternalSyntheticOutline0.m("UploadError|");
                    m.append(th.toString());
                    MessageLog.e(EventDispatcher.TAG, m.toString());
                }
            }));
            return;
        }
        if (strategy.stable && (eventRepository = this.mEventRepository) != null) {
            eventRepository.save(list);
        }
        if (this.mBuffer.containsKey(str)) {
            list2 = this.mBuffer.get(str);
            list2.addAll(list);
        } else {
            list2 = this.mEventRepository.getSortedEvent();
            if (list2 == null) {
                list2 = new ArrayList<>();
            }
            this.mBuffer.put(str, list2);
        }
        long dispatchTime = this.mDispatcherHistory.getDispatchTime(str);
        long currentTimeStamp = TimeStamp.getCurrentTimeStamp();
        if (dispatchTime == 0) {
            this.mDispatcherHistory.saveDispatchTime(str, currentTimeStamp);
            return;
        }
        if (currentTimeStamp - dispatchTime >= strategy.bufferTime || list2.size() >= strategy.bufferCount) {
            if (Env.isDebug()) {
                MessageLog.d(TAG, "Upload|", str, "|", Integer.valueOf(list2.size()), "|", JSON.toJSONString(list2));
            } else {
                MessageLog.e(TAG, "Upload|", str, "|", Integer.valueOf(list.size()));
            }
            this.mDispatcherHistory.saveDispatchTime(str, currentTimeStamp);
            ArrayList arrayList = new ArrayList();
            if (strategy.mergeTopicKey != null) {
                HashMap hashMap = new HashMap();
                for (ClientEvent clientEvent : list2) {
                    if (clientEvent.getExt() == null || !clientEvent.getExt().containsKey(strategy.mergeTopicKey)) {
                        arrayList.add(clientEvent);
                    } else {
                        Object obj = clientEvent.getExt().get(strategy.mergeTopicKey);
                        ClientEvent clientEvent2 = (ClientEvent) hashMap.get(obj);
                        if (clientEvent2 == null || clientEvent.getEventTime() >= clientEvent2.getEventTime()) {
                            hashMap.put(obj, clientEvent);
                        }
                    }
                }
                arrayList.addAll(hashMap.values());
            } else {
                arrayList.addAll(list2);
            }
            if (Env.isDebug()) {
                MessageLog.d(TAG, "Upload|", str, "|", Integer.valueOf(arrayList.size()), "|", JSON.toJSONString(arrayList));
            } else {
                MessageLog.e(TAG, "Upload|", str, "|", Integer.valueOf(arrayList.size()));
            }
            if (arrayList.isEmpty()) {
                return;
            }
            this.mCompositeDisposable.add(this.mUploadManager.upload(arrayList).subscribe(new Consumer<List<ClientEvent>>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.12
                @Override // io.reactivex.functions.Consumer
                public void accept(List<ClientEvent> list3) {
                    if (Env.isDebug()) {
                        MessageLog.d(EventDispatcher.TAG, "UploadSuccess|", str, "|", Integer.valueOf(list3.size()), "|", JSON.toJSONString(list3));
                    } else {
                        MessageLog.e(EventDispatcher.TAG, "UploadSuccess|", str, "|", Integer.valueOf(list3.size()));
                    }
                    if (list3.isEmpty()) {
                        return;
                    }
                    List list4 = (List) EventDispatcher.this.mBuffer.get(str);
                    if (list4 != null) {
                        list4.removeAll(list2);
                    }
                    if (strategy.stable) {
                        EventDispatcher.this.mEventRepository.remove(list2);
                    }
                }
            }, new Consumer<Throwable>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.13
                @Override // io.reactivex.functions.Consumer
                public void accept(Throwable th) throws Exception {
                    StringBuilder m = AppRestartResult$$ExternalSyntheticOutline0.m("UploadError|");
                    m.append(th.toString());
                    MessageLog.e(EventDispatcher.TAG, m.toString());
                }
            }));
        }
    }

    public void dispatchNow() {
    }

    public void publish(ClientEvent clientEvent) {
        this.mPublish.onNext(clientEvent);
    }

    public void shotdown() {
        this.mCompositeDisposable.dispose();
    }

    public void subscribe(final String str, final Strategy strategy) {
        CompositeDisposable compositeDisposable = this.mCompositeDisposable;
        Subject<Object> subject = this.mPublish;
        SaturativeExecutor saturativeExecutor = mDispatchExecutor;
        Scheduler scheduler = Schedulers.SINGLE;
        compositeDisposable.add(subject.observeOn(new ExecutorScheduler(saturativeExecutor)).ofType(ClientEvent.class).filter(new Predicate<ClientEvent>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.7
            @Override // io.reactivex.functions.Predicate
            public boolean test(ClientEvent clientEvent) throws Exception {
                return TextUtils.equals(str, clientEvent.getEventName());
            }
        }).map(new Function<ClientEvent, List<ClientEvent>>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.6
            @Override // io.reactivex.functions.Function
            public List<ClientEvent> apply(ClientEvent clientEvent) throws Exception {
                return Collections.singletonList(clientEvent);
            }
        }).subscribe(new Consumer<List<ClientEvent>>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.3
            @Override // io.reactivex.functions.Consumer
            public void accept(List<ClientEvent> list) throws Exception {
                EventDispatcher.this.action(str, strategy, list);
            }
        }, new Consumer<Throwable>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.4
            @Override // io.reactivex.functions.Consumer
            public void accept(Throwable th) throws Exception {
                MessageLog.e(EventDispatcher.TAG, th.toString());
            }
        }, new Action() { // from class: com.taobao.message.eventengine.core.EventDispatcher.5
            @Override // io.reactivex.functions.Action
            public void run() throws Exception {
                MessageLog.e(EventDispatcher.TAG, "completed");
            }
        }));
        if ("1".equals(ConfigCenterManager.getContainerConfig("engineUploadCold", "1"))) {
            this.mCompositeDisposable.add(Observable.timer(RandomUtil.getRandomNumberInRange(6000) + 6000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.8
                @Override // io.reactivex.functions.Consumer
                public void accept(Long l) throws Exception {
                    EventDispatcher.this.action(str, strategy, new ArrayList());
                }
            }, new Consumer<Throwable>() { // from class: com.taobao.message.eventengine.core.EventDispatcher.9
                @Override // io.reactivex.functions.Consumer
                public void accept(Throwable th) throws Exception {
                    StringBuilder m = AppRestartResult$$ExternalSyntheticOutline0.m("error|");
                    m.append(str);
                    m.append("|");
                    m.append(th.toString());
                    MessageLog.e(EventDispatcher.TAG, m.toString());
                }
            }));
        }
    }
}
