package com.pushtechnology.diffusion.client.features.impl;

import com.pushtechnology.diffusion.client.features.Messaging;
import com.pushtechnology.diffusion.client.internal.session.ClosedSessionListener;
import com.pushtechnology.diffusion.client.internal.session.InternalSession;
import com.pushtechnology.diffusion.client.internal.session.SessionErrorImpl;
import com.pushtechnology.diffusion.client.internal.streams.MessageStreamRegistry;
import com.pushtechnology.diffusion.client.internal.streams.RequestStreamRegistry;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.TopicSelector;
import com.pushtechnology.diffusion.client.types.SendOptions;
import com.pushtechnology.diffusion.command.commands.control.client.MessagingResponse;
import com.pushtechnology.diffusion.command.commands.send.MessagingSendRequest;
import com.pushtechnology.diffusion.command.commands.send.SendRequest;
import com.pushtechnology.diffusion.command.sender.ReferenceCallback;
import com.pushtechnology.diffusion.command.sender.ServiceReference;
import com.pushtechnology.diffusion.command.services.definition.StandardServices;
import com.pushtechnology.diffusion.content.ContentImpl;
import com.pushtechnology.diffusion.datatype.Bytes;
import com.pushtechnology.diffusion.datatype.DataType;
import com.pushtechnology.diffusion.datatype.DataTypes;
import com.pushtechnology.diffusion.io.bytes.ArrayIBytes;
import com.pushtechnology.diffusion.java7.Functions;
import com.pushtechnology.diffusion.topics.selectors.TopicSelectorParser;
import com.pushtechnology.diffusion.types.SendOptionsImpl;
import java8.util.Objects;
import java8.util.concurrent.CompletableFuture;
import net.jcip.annotations.Immutable;

/* loaded from: input_file:com/pushtechnology/diffusion/client/features/impl/MessagingImpl.class */
public class MessagingImpl extends AbstractFeature implements Messaging {
    private final RequestStreamRegistry requestStreams;
    private final MessageStreamRegistry messageStreams;
    private final DataTypes dataTypes;
    private final TopicSelectorParser topicSelectorParser;
    private final ServiceReference<SendRequest, Void> sendService;
    private final ServiceReference<MessagingSendRequest, MessagingResponse> messagingSendService;
    private static final Messaging.SendContextCallback<Messaging.SendCallback> CONTEXT_CALLBACK_ADAPTER = new SendContextCallbackAdapter();
    private static final SendOptions DEFAULT_SEND_OPTIONS = new SendOptionsImpl.Builder().build();

    @Immutable
    /* loaded from: input_file:com/pushtechnology/diffusion/client/features/impl/MessagingImpl$SendContextCallbackAdapter.class */
    private static class SendContextCallbackAdapter implements Messaging.SendContextCallback<Messaging.SendCallback> {
        private SendContextCallbackAdapter() {
        }

        @Override // com.pushtechnology.diffusion.client.features.ContextCallback
        public void onDiscard(Messaging.SendCallback sendCallback) {
            sendCallback.onDiscard();
        }

        @Override // com.pushtechnology.diffusion.client.features.Messaging.SendContextCallback
        public void onComplete(Messaging.SendCallback sendCallback) {
            sendCallback.onComplete();
        }
    }

    public MessagingImpl(Session session, InternalSession internalSession, TopicSelectorParser topicSelectorParser, DataTypes dataTypes, RequestStreamRegistry requestStreamRegistry, MessageStreamRegistry messageStreamRegistry) {
        super(session, internalSession);
        this.topicSelectorParser = topicSelectorParser;
        this.requestStreams = requestStreamRegistry;
        this.messageStreams = messageStreamRegistry;
        this.dataTypes = dataTypes;
        this.sendService = internalSession.getServiceLocator().obtainService(StandardServices.SEND);
        this.messagingSendService = internalSession.getServiceLocator().obtainService(StandardServices.MESSAGING_SEND);
        internalSession.addListener(new ClosedSessionListener() { // from class: com.pushtechnology.diffusion.client.features.impl.MessagingImpl.1
            @Override // com.pushtechnology.diffusion.client.internal.session.ClosedSessionListener
            public void onClosed() {
                MessagingImpl.this.messageStreams.discardAll();
                MessagingImpl.this.requestStreams.discardAll();
            }
        });
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public CompletableFuture<?> send(String str, CharSequence charSequence) {
        return send(str, new ContentImpl((CharSequence) Objects.requireNonNull(charSequence, "message is null")), DEFAULT_SEND_OPTIONS);
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public CompletableFuture<?> send(String str, Bytes bytes) {
        return send(str, bytes, DEFAULT_SEND_OPTIONS);
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public CompletableFuture<?> send(String str, Bytes bytes, SendOptions sendOptions) {
        return this.sendService.sendCommand(newRequest(str, bytes, sendOptions)).thenApply(Functions.identity());
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public <T, R> CompletableFuture<R> sendRequest(String str, T t, Class<T> cls, Class<R> cls2) {
        DataType byClass = this.dataTypes.getByClass((Class) Objects.requireNonNull(cls, "request type is null"));
        Objects.requireNonNull(cls2, "response type is null");
        return (CompletableFuture<R>) this.messagingSendService.sendCommand(new MessagingSendRequest((String) Objects.requireNonNull(str, "path is null"), byClass, ArrayIBytes.toIBytes(byClass.toBytes(t)))).thenApply(messagingResponse -> {
            return messagingResponse.getDataType().readAs(cls2, messagingResponse.getResponse());
        });
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public void send(String str, CharSequence charSequence, Messaging.SendCallback sendCallback) {
        send(str, new ContentImpl((CharSequence) Objects.requireNonNull(charSequence, "message is null")), DEFAULT_SEND_OPTIONS, sendCallback);
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public void send(String str, Bytes bytes, Messaging.SendCallback sendCallback) {
        send(str, bytes, DEFAULT_SEND_OPTIONS, sendCallback);
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public void send(String str, Bytes bytes, SendOptions sendOptions, Messaging.SendCallback sendCallback) {
        send(str, bytes, sendOptions, Objects.requireNonNull(sendCallback, "callback is null"), CONTEXT_CALLBACK_ADAPTER);
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public <C> void send(String str, CharSequence charSequence, C c, Messaging.SendContextCallback<C> sendContextCallback) {
        send(str, new ContentImpl((CharSequence) Objects.requireNonNull(charSequence, "message is null")), DEFAULT_SEND_OPTIONS, c, sendContextCallback);
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public <C> void send(String str, Bytes bytes, C c, Messaging.SendContextCallback<C> sendContextCallback) {
        send(str, bytes, DEFAULT_SEND_OPTIONS, c, sendContextCallback);
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public <C> void send(String str, Bytes bytes, SendOptions sendOptions, final C c, final Messaging.SendContextCallback<C> sendContextCallback) {
        Objects.requireNonNull(bytes, "message is null");
        Objects.requireNonNull(sendContextCallback, "callback is null");
        this.sendService.sendCommand((ServiceReference<SendRequest, Void>) newRequest(str, bytes, sendOptions), new ReferenceCallback<Void>() { // from class: com.pushtechnology.diffusion.client.features.impl.MessagingImpl.2
            @Override // com.pushtechnology.diffusion.command.sender.ReferenceCallback
            public void onResponse(Void r4) {
                sendContextCallback.onComplete(c);
            }

            @Override // com.pushtechnology.diffusion.command.sender.ReferenceCallback
            public void onFailure(Throwable th) {
                MessagingImpl.this.internalSession().getErrorHandler().notifyError(new SessionErrorImpl(th.toString(), th));
                sendContextCallback.onDiscard(c);
            }
        });
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public void addMessageStream(TopicSelector topicSelector, Messaging.MessageStream messageStream) {
        this.messageStreams.add((TopicSelector) Objects.requireNonNull(topicSelector, "topics is null"), (Messaging.MessageStream) Objects.requireNonNull(messageStream, "stream is null"));
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public void addMessageStream(String str, Messaging.MessageStream messageStream) throws IllegalArgumentException {
        addMessageStream(this.topicSelectorParser.parse((String) Objects.requireNonNull(str, "topics is null")), messageStream);
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public void addFallbackMessageStream(Messaging.MessageStream messageStream) {
        this.messageStreams.addFallback((Messaging.MessageStream) Objects.requireNonNull(messageStream, "stream is null"));
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public <T, R> Messaging.RequestStream<?, ?> setRequestStream(String str, Class<? extends T> cls, Class<? super R> cls2, Messaging.RequestStream<T, R> requestStream) {
        return this.requestStreams.add((String) Objects.requireNonNull(str, "path is null"), (Class) Objects.requireNonNull(cls, "request type is null"), (Class) Objects.requireNonNull(cls2, "response type is null"), (Messaging.RequestStream) Objects.requireNonNull(requestStream, "stream is null"));
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public void removeMessageStream(Messaging.MessageStream messageStream) {
        this.messageStreams.remove((Messaging.MessageStream) Objects.requireNonNull(messageStream, "stream is null"));
    }

    @Override // com.pushtechnology.diffusion.client.features.Messaging
    public Messaging.RequestStream<?, ?> removeRequestStream(String str) {
        return this.requestStreams.remove((String) Objects.requireNonNull(str, "path is null"));
    }

    @Override // com.pushtechnology.diffusion.client.types.SendOptions.Builder.Factory
    public SendOptions.Builder sendOptionsBuilder() {
        return new SendOptionsImpl.Builder();
    }

    private static SendRequest newRequest(String str, Bytes bytes, SendOptions sendOptions) {
        return new SendRequest((String) Objects.requireNonNull(str, "path is null"), ContentImpl.bytesToContent((Bytes) Objects.requireNonNull(bytes, "message is null")), (SendOptions) Objects.requireNonNull(sendOptions, "options is null"));
    }
}
