package com.pushtechnology.diffusion.client.internal.routing;

import com.pushtechnology.diffusion.client.callbacks.Stream;
import com.pushtechnology.diffusion.client.content.Content;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.internal.session.InternalSession;
import com.pushtechnology.diffusion.client.internal.session.SessionErrorImpl;
import com.pushtechnology.diffusion.client.topics.TopicSelector;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.collections.ImmutableSet;
import com.pushtechnology.diffusion.collections.Predicate;
import com.pushtechnology.diffusion.datatype.Bytes;
import com.pushtechnology.diffusion.datatype.DataType;
import com.pushtechnology.diffusion.datatype.DataTypes;
import com.pushtechnology.diffusion.logs.i18n.I18nLogger;
import com.pushtechnology.diffusion.threads.InboundThreadOnly;
import com.pushtechnology.diffusion.topics.details.UniversalTopicDetails;
import com.pushtechnology.diffusion.topics.info.TopicInfo;
import com.pushtechnology.diffusion.util.concurrent.threads.CommonThreadPools;
import com.pushtechnology.diffusion.util.concurrent.threads.ExecutionPool;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;

@ThreadSafe
/* loaded from: input_file:com/pushtechnology/diffusion/client/internal/routing/TopicRoutingImpl.class */
public final class TopicRoutingImpl implements TopicRouting {
    private static final Logger LOG;
    private final DataTypes dataTypes;
    private final ExecutionPool inboundThreadPool;

    @InboundThreadOnly
    private final Map<String, TopicCacheEntry> byPath = new HashMap();

    @InboundThreadOnly
    private final Map<Integer, TopicCacheEntry> byId = new HashMap();

    @InboundThreadOnly
    private final Map<TopicSelector, ImmutableSet<StreamProxy>> streamProxies = new HashMap();

    @InboundThreadOnly
    private ImmutableSet<StreamProxy> fallbackProxies = ImmutableSet.empty();
    private volatile boolean cacheDisabled = Boolean.getBoolean("diffusion.disabletopicvaluecache");
    private volatile boolean closed = false;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/pushtechnology/diffusion/client/internal/routing/TopicRoutingImpl$RunIfNotClosed.class */
    public abstract class RunIfNotClosed implements Runnable {
        private RunIfNotClosed() {
        }

        @Override // java.lang.Runnable
        public final void run() {
            if (TopicRoutingImpl.this.closed) {
                return;
            }
            doRun();
        }

        protected abstract void doRun();
    }

    public TopicRoutingImpl(DataTypes dataTypes, CommonThreadPools commonThreadPools) {
        this.dataTypes = dataTypes;
        this.inboundThreadPool = commonThreadPools.getInboundThreadPool();
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    @InboundThreadOnly
    public void notifySubscription(InternalSession internalSession, TopicInfo topicInfo) {
        TopicCacheEntry topicCacheEntrySingleValue;
        String path = topicInfo.getPath();
        TopicType type = topicInfo.getDetails().getType();
        ImmutableSet<StreamProxy> streamsForTopic = streamsForTopic(path, type);
        if (this.cacheDisabled) {
            topicCacheEntrySingleValue = new TopicCacheEntryNoValue(topicInfo, streamsForTopic);
        } else if (UniversalTopicDetails.TOPIC_TYPES.contains(type)) {
            DataType<?> byName = this.dataTypes.getByName(type.name().toLowerCase());
            topicCacheEntrySingleValue = new TopicCacheEntryUniversal(topicInfo, byName, byName.deltaType("binary"), streamsForTopic);
        } else {
            topicCacheEntrySingleValue = type == TopicType.SINGLE_VALUE ? new TopicCacheEntrySingleValue(topicInfo, streamsForTopic) : new TopicCacheEntryNoValue(topicInfo, streamsForTopic);
        }
        TopicCacheEntry put = this.byPath.put(path, topicCacheEntrySingleValue);
        if (put != null) {
            throw new IllegalStateException("Existing entry " + put + " found for path " + path + " when adding " + topicCacheEntrySingleValue);
        }
        int topicId = topicInfo.getTopicId();
        TopicCacheEntry put2 = this.byId.put(Integer.valueOf(topicId), topicCacheEntrySingleValue);
        if (put2 != null) {
            throw new IllegalStateException("Existing entry " + put2 + " found for id " + topicId + " when adding " + topicCacheEntrySingleValue);
        }
        topicCacheEntrySingleValue.notifyInitialStreamsOfSubscription(this.fallbackProxies);
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    public void notifyUnsubscriptionOfAllTopics() {
        if (this.closed) {
            return;
        }
        execute(new RunIfNotClosed() { // from class: com.pushtechnology.diffusion.client.internal.routing.TopicRoutingImpl.1
            @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRoutingImpl.RunIfNotClosed
            protected void doRun() {
                Iterator it = TopicRoutingImpl.this.byId.values().iterator();
                while (it.hasNext()) {
                    ((TopicCacheEntry) it.next()).notifyStreamsOfUnsubscription(TopicRoutingImpl.this.fallbackProxies, Topics.UnsubscribeReason.SUBSCRIPTION_REFRESH);
                }
                TopicRoutingImpl.this.byId.clear();
                TopicRoutingImpl.this.byPath.clear();
            }
        });
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    @InboundThreadOnly
    public void notifyValue(InternalSession internalSession, int i, Content content) {
        if (this.closed) {
            return;
        }
        InternalSession.InternalErrorHandler errorHandler = internalSession.getErrorHandler();
        TopicCacheEntry topicCacheEntry = this.byId.get(Integer.valueOf(i));
        if (topicCacheEntry != null) {
            topicCacheEntry.handleValue(content, this.fallbackProxies, errorHandler);
        } else {
            errorHandler.notifyError(new SessionErrorImpl("Data loss on topic with ID " + i + " - possibly due to reconnection : session closing", null));
            internalSession.close();
        }
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    @InboundThreadOnly
    public void notifyValue(InternalSession internalSession, String str, Content content) {
        if (this.closed) {
            return;
        }
        InternalSession.InternalErrorHandler errorHandler = internalSession.getErrorHandler();
        TopicCacheEntry topicCacheEntry = this.byPath.get(str);
        if (topicCacheEntry != null) {
            topicCacheEntry.handleValue(content, this.fallbackProxies, errorHandler);
        } else {
            errorHandler.notifyError(new SessionErrorImpl("Data loss on topic " + str + " - possibly due to reconnection : session closing", null));
            internalSession.close();
        }
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    @InboundThreadOnly
    public void notifyDelta(InternalSession internalSession, int i, Content content) {
        if (this.closed) {
            return;
        }
        InternalSession.InternalErrorHandler errorHandler = internalSession.getErrorHandler();
        TopicCacheEntry topicCacheEntry = this.byId.get(Integer.valueOf(i));
        if (topicCacheEntry != null) {
            topicCacheEntry.handleDelta(content, this.fallbackProxies, errorHandler);
        } else {
            errorHandler.notifyError(new SessionErrorImpl("Data loss on topic with ID " + i + " - possibly due to reconnection : session closing", null));
            internalSession.close();
        }
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    @InboundThreadOnly
    public void notifyDelta(InternalSession internalSession, String str, Content content) {
        if (this.closed) {
            return;
        }
        InternalSession.InternalErrorHandler errorHandler = internalSession.getErrorHandler();
        TopicCacheEntry topicCacheEntry = this.byPath.get(str);
        if (topicCacheEntry != null) {
            topicCacheEntry.handleDelta(content, this.fallbackProxies, errorHandler);
        } else {
            errorHandler.notifyError(new SessionErrorImpl("Data loss on topic " + str + " - possibly due to reconnection : session closing", null));
            internalSession.close();
        }
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    @InboundThreadOnly
    public void notifyUnsubscription(int i, Topics.UnsubscribeReason unsubscribeReason) {
        TopicCacheEntry remove = this.byId.remove(Integer.valueOf(i));
        if (remove != null) {
            TopicCacheEntry remove2 = this.byPath.remove(remove.getTopicPath());
            if (!$assertionsDisabled && remove != remove2) {
                throw new AssertionError(remove + " != " + remove2);
            }
            remove.notifyStreamsOfUnsubscription(this.fallbackProxies, unsubscribeReason);
        }
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.StreamRegistry
    public <V> void addStream(InternalSession internalSession, TopicSelector topicSelector, Class<V> cls, Topics.ValueStream<V> valueStream) {
        addStreamProxy(internalSession, topicSelector, createValueStreamProxy(cls, valueStream));
    }

    private <V> StreamProxy createValueStreamProxy(Class<V> cls, Topics.ValueStream<V> valueStream) {
        return cls == Bytes.class ? new BytesValueStreamProxy(valueStream) : cls == Content.class ? new ContentValueStreamProxy(valueStream) : new DataTypeValueStreamProxy(TopicType.valueOf(this.dataTypes.getByClass(cls).getTypeName().toUpperCase()), valueStream);
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.StreamRegistry
    public void addStream(InternalSession internalSession, TopicSelector topicSelector, Topics.TopicStream topicStream) {
        addStreamProxy(internalSession, topicSelector, new TopicStreamProxy(topicStream));
    }

    private void addStreamProxy(final InternalSession internalSession, final TopicSelector topicSelector, final StreamProxy streamProxy) {
        execute(new RunIfNotClosed() { // from class: com.pushtechnology.diffusion.client.internal.routing.TopicRoutingImpl.2
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRoutingImpl.RunIfNotClosed
            public void doRun() {
                ImmutableSet with;
                ImmutableSet immutableSet = (ImmutableSet) TopicRoutingImpl.this.streamProxies.get(topicSelector);
                if (immutableSet == null) {
                    with = ImmutableSet.of((Object[]) new StreamProxy[]{streamProxy});
                } else {
                    with = immutableSet.with(streamProxy);
                    if (with == immutableSet) {
                        return;
                    }
                }
                TopicRoutingImpl.this.streamProxies.put(topicSelector, with);
                for (TopicCacheEntry topicCacheEntry : TopicRoutingImpl.this.byId.values()) {
                    if (topicSelector.selects(topicCacheEntry.getTopicPath())) {
                        topicCacheEntry.addStream(streamProxy, TopicRoutingImpl.this.fallbackProxies, internalSession.getErrorHandler());
                    }
                }
            }
        });
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.StreamRegistry
    public void addFallbackStream(InternalSession internalSession, Topics.TopicStream topicStream) {
        addFallbackStreamProxy(internalSession, new TopicStreamProxy(topicStream));
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.StreamRegistry
    public <V> void addFallbackStream(InternalSession internalSession, Class<V> cls, Topics.ValueStream<V> valueStream) {
        addFallbackStreamProxy(internalSession, createValueStreamProxy(cls, valueStream));
    }

    private void addFallbackStreamProxy(final InternalSession internalSession, final StreamProxy streamProxy) {
        execute(new RunIfNotClosed() { // from class: com.pushtechnology.diffusion.client.internal.routing.TopicRoutingImpl.3
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRoutingImpl.RunIfNotClosed
            public void doRun() {
                ImmutableSet immutableSet = TopicRoutingImpl.this.fallbackProxies;
                ImmutableSet with = immutableSet.with(streamProxy);
                if (with == immutableSet) {
                    return;
                }
                TopicRoutingImpl.this.fallbackProxies = with;
                Iterator it = TopicRoutingImpl.this.byId.values().iterator();
                while (it.hasNext()) {
                    ((TopicCacheEntry) it.next()).notifyFallbackSubscription(streamProxy, internalSession.getErrorHandler());
                }
            }
        });
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.StreamRegistry
    public void removeStream(final InternalSession internalSession, final Stream stream) {
        final Predicate<StreamProxy> predicate = new Predicate<StreamProxy>() { // from class: com.pushtechnology.diffusion.client.internal.routing.TopicRoutingImpl.4
            @Override // com.pushtechnology.diffusion.collections.Predicate
            public boolean test(StreamProxy streamProxy) {
                return streamProxy.isProxyFor(stream);
            }
        };
        execute(new RunIfNotClosed() { // from class: com.pushtechnology.diffusion.client.internal.routing.TopicRoutingImpl.5
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRoutingImpl.RunIfNotClosed
            public void doRun() {
                boolean removeFallbackStream = TopicRoutingImpl.this.removeFallbackStream(predicate);
                if (TopicRoutingImpl.this.removeProxyStream(predicate)) {
                    removeFallbackStream = true;
                    InternalSession.InternalErrorHandler errorHandler = internalSession.getErrorHandler();
                    Iterator it = TopicRoutingImpl.this.byId.values().iterator();
                    while (it.hasNext()) {
                        ((TopicCacheEntry) it.next()).removeStream(predicate, TopicRoutingImpl.this.fallbackProxies, errorHandler);
                    }
                }
                if (removeFallbackStream) {
                    try {
                        stream.onClose();
                    } catch (Exception e) {
                        TopicRoutingImpl.LOG.error("TOPICS_STREAM_EXCEPTION", stream, e);
                    }
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    @InboundThreadOnly
    public boolean removeFallbackStream(Predicate<StreamProxy> predicate) {
        ImmutableSet<StreamProxy> immutableSet = this.fallbackProxies;
        ImmutableSet<StreamProxy> withoutFirst = this.fallbackProxies.withoutFirst(predicate);
        if (withoutFirst == immutableSet) {
            return false;
        }
        this.fallbackProxies = withoutFirst;
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    @InboundThreadOnly
    public boolean removeProxyStream(Predicate<StreamProxy> predicate) {
        boolean z = false;
        Iterator<Map.Entry<TopicSelector, ImmutableSet<StreamProxy>>> it = this.streamProxies.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TopicSelector, ImmutableSet<StreamProxy>> next = it.next();
            ImmutableSet<StreamProxy> withoutFirst = next.getValue().withoutFirst(predicate);
            if (withoutFirst.isEmpty()) {
                it.remove();
                z = true;
            } else if (withoutFirst != next.getValue()) {
                next.setValue(withoutFirst);
                z = true;
            }
        }
        return z;
    }

    @InboundThreadOnly
    private ImmutableSet<StreamProxy> streamsForTopic(String str, TopicType topicType) {
        Map<TopicSelector, ImmutableSet<StreamProxy>> map = this.streamProxies;
        if (map.isEmpty()) {
            return ImmutableSet.empty();
        }
        HashSet hashSet = new HashSet();
        for (Map.Entry<TopicSelector, ImmutableSet<StreamProxy>> entry : map.entrySet()) {
            if (entry.getKey().selects(str)) {
                Iterator<StreamProxy> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    StreamProxy next = it.next();
                    if (next.selectsTopicType(topicType)) {
                        hashSet.add(next);
                    }
                }
            }
        }
        return ImmutableSet.from((Set) hashSet);
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    public void disableValueCaching() {
        this.cacheDisabled = true;
    }

    @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRouting
    public void close() {
        execute(new RunIfNotClosed() { // from class: com.pushtechnology.diffusion.client.internal.routing.TopicRoutingImpl.6
            @Override // com.pushtechnology.diffusion.client.internal.routing.TopicRoutingImpl.RunIfNotClosed
            public void doRun() {
                TopicRoutingImpl.this.closed = true;
                TopicRoutingImpl.this.byPath.clear();
                TopicRoutingImpl.this.byId.clear();
                HashSet hashSet = new HashSet();
                hashSet.addAll(TopicRoutingImpl.this.fallbackProxies);
                Iterator it = TopicRoutingImpl.this.streamProxies.values().iterator();
                while (it.hasNext()) {
                    hashSet.addAll((Set) it.next());
                }
                Iterator it2 = hashSet.iterator();
                while (it2.hasNext()) {
                    ((StreamProxy) it2.next()).onSessionClosed();
                }
                TopicRoutingImpl.this.streamProxies.clear();
                TopicRoutingImpl.this.fallbackProxies = ImmutableSet.empty();
            }
        });
    }

    private void execute(RunIfNotClosed runIfNotClosed) {
        if (this.closed) {
            return;
        }
        this.inboundThreadPool.execute(this, runIfNotClosed);
    }

    static {
        $assertionsDisabled = !TopicRoutingImpl.class.desiredAssertionStatus();
        LOG = I18nLogger.getLogger((Class<?>) TopicRoutingImpl.class);
    }
}
