package defpackage;

import com.google.common.base.Optional;
import com.spotify.base.java.logging.Logger;
import com.spotify.cosmos.pubsub.PubSubCosmosClient;
import com.spotify.cosmos.pubsub.model.PubSub;
import io.reactivex.functions.g;
import io.reactivex.functions.m;
import io.reactivex.functions.o;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.u;
import java.util.concurrent.atomic.AtomicReference;
import kotlin.jvm.internal.i;

/* loaded from: classes5.dex */
public final class bph implements aph {
    private final eph a;
    private final PubSubCosmosClient b;
    private final AtomicReference<Optional<PublishSubject<ep0>>> c;

    public bph(eph pubSubStats, PubSubCosmosClient pubSubCosmosClient) {
        i.e(pubSubStats, "pubSubStats");
        i.e(pubSubCosmosClient, "pubSubCosmosClient");
        this.a = pubSubStats;
        this.b = pubSubCosmosClient;
        this.c = new AtomicReference<>(Optional.a());
    }

    public static Optional d(bph this$0, String ident, lqj pushedMessageTransformer, PubSub it) {
        Optional e;
        i.e(this$0, "this$0");
        i.e(ident, "$ident");
        i.e(pushedMessageTransformer, "$pushedMessageTransformer");
        i.e(it, "it");
        try {
            Object invoke = pushedMessageTransformer.invoke(new hph(it.getIdent(), it.getPayload(), it.getAttributes()));
            if (invoke == null) {
                Logger.d("Error while transforming pushed message with ident %s", it.getIdent());
                this$0.a.b(ident);
                e = Optional.a();
                i.d(e, "absent()");
            } else {
                e = Optional.e(invoke);
                i.d(e, "of(entity)");
            }
            return e;
        } catch (Exception e2) {
            Logger.e(e2, "Exception while transforming message for %s", it.getIdent());
            this$0.a.b(ident);
            Optional a = Optional.a();
            i.d(a, "absent()");
            return a;
        }
    }

    public static void e(bph this$0, String ident, PubSub pubSub) {
        i.e(this$0, "this$0");
        i.e(ident, "$ident");
        this$0.a.c(ident);
    }

    @Override // defpackage.aph
    public void a() {
        Optional<PublishSubject<ep0>> andSet = this.c.getAndSet(Optional.a());
        if (!andSet.d()) {
            throw new IllegalStateException("called onSessionLogout before onSessionLogin");
        }
        andSet.c().onNext(ep0.a());
        this.a.a();
    }

    @Override // defpackage.aph
    public <T> u<T> b(final String ident, final lqj<? super hph, ? extends T> pushedMessageTransformer) {
        i.e(ident, "ident");
        i.e(pushedMessageTransformer, "pushedMessageTransformer");
        Optional<PublishSubject<ep0>> optional = this.c.get();
        if (!optional.d()) {
            throw new IllegalStateException("tried to subscribe before onSessionLogin or after onSessionLogout");
        }
        u<T> z0 = this.b.observableForIdent(ident).W0(optional.c()).W(new g() { // from class: yoh
            @Override // io.reactivex.functions.g
            public final void accept(Object obj) {
                bph.e(bph.this, ident, (PubSub) obj);
            }
        }).s0(new m() { // from class: woh
            @Override // io.reactivex.functions.m
            public final Object apply(Object obj) {
                return bph.d(bph.this, ident, pushedMessageTransformer, (PubSub) obj);
            }
        }).M0(new o() { // from class: uoh
            @Override // io.reactivex.functions.o
            public final boolean test(Object obj) {
                Optional it = (Optional) obj;
                i.e(it, "it");
                return !it.d();
            }
        }).s0(new m() { // from class: voh
            @Override // io.reactivex.functions.m
            public final Object apply(Object obj) {
                Optional it = (Optional) obj;
                i.e(it, "it");
                return it.c();
            }
        }).z0(new m() { // from class: xoh
            @Override // io.reactivex.functions.m
            public final Object apply(Object obj) {
                String ident2 = ident;
                i.e(ident2, "$ident");
                return u.Y(new IllegalStateException(i.j("Error while receiving pubsub message for ident %s", ident2), (Throwable) obj));
            }
        });
        i.d(z0, "pubSubCosmosClient.observableForIdent(ident)\n                .takeUntil(stopObservable.get())\n                .doOnNext {\n                    pubSubStats.registerMessage(ident)\n                }\n                .map {\n                    convert(ident, it, pushedMessageTransformer)\n                }\n                .skipWhile {\n                    !it.isPresent()\n                }.map {\n                    it.get()\n                }.onErrorResumeNext { throwable: Throwable? ->\n                    Observable.error(\n                        IllegalStateException(\n                            \"Error while receiving pubsub message for ident %s\" + ident,\n                            throwable\n                        )\n                    )\n                }");
        return z0;
    }

    @Override // defpackage.aph
    public void c() {
        this.c.set(Optional.e(PublishSubject.q1()));
    }
}
