package edu.mit.media.ie.shair.middleware.net.cloud;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import edu.mit.media.ie.shair.middleware.common.AbstractEventExchanger;
import edu.mit.media.ie.shair.middleware.common.Peer;
import edu.mit.media.ie.shair.middleware.common.RawMessage;
import edu.mit.media.ie.shair.middleware.common.SerializationUtils;
import edu.mit.media.ie.shair.middleware.event.LostPeerEvent;
import edu.mit.media.ie.shair.middleware.event.MessageReceivedEvent;
import edu.mit.media.ie.shair.middleware.event.MessageSentEvent;
import edu.mit.media.ie.shair.middleware.event.NetworkStartedEvent;
import edu.mit.media.ie.shair.middleware.event.NetworkStoppedEvent;
import edu.mit.media.ie.shair.middleware.event.NewPeerEvent;
import edu.mit.media.ie.shair.middleware.net.NetworkDriver;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes.dex */
public class RabbitMQNetworkDriver extends AbstractEventExchanger implements NetworkDriver {
    private static final String BROADCAST = "broadcast";
    public static final int DEFAULT_PEER_TIMEOUT_INTERVAL_MILLISECONDS = 30000;
    public static final int DEFAULT_RECONNECTION_INTERVAL_MILLISECONDS = 5000;
    private Channel channel;
    private Connection connection;
    private ConnectionJob connectionJob;
    private Consumer consumer;
    private String dataExchangeName;
    private final ScheduledExecutorService executor;
    private String host;
    private final Peer localPeer;
    private String localQueue;
    private String networkTag;
    private PeerTimeoutJob peerTimeoutJob;
    private int port;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private int reconnectionIntervalMS = 5000;
    private int peerTimeoutIntervalMS = 30000;
    private AtomicBoolean started = new AtomicBoolean(false);
    private final BiMap<String, Peer> queueNamesToPeers = HashBiMap.create();
    private final Map<Peer, Long> lastActivity = Collections.synchronizedMap(new HashMap());

    /* loaded from: classes.dex */
    private class ConnectionJob implements Runnable {
        private boolean terminated;

        private ConnectionJob() {
            this.terminated = false;
        }

        /* synthetic */ ConnectionJob(RabbitMQNetworkDriver rabbitMQNetworkDriver, ConnectionJob connectionJob) {
            this();
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.terminated) {
                return;
            }
            RabbitMQNetworkDriver.this.executor.schedule(this, RabbitMQNetworkDriver.this.reconnectionIntervalMS, TimeUnit.MILLISECONDS);
            if (!RabbitMQNetworkDriver.this.started.get() || RabbitMQNetworkDriver.this.isConnected()) {
                return;
            }
            try {
                RabbitMQNetworkDriver.this.connect();
            } catch (IOException e) {
                RabbitMQNetworkDriver.this.log("Connect exception (" + e.getMessage() + ")");
            }
        }

        void terminate() {
            this.terminated = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes.dex */
    public class LocalConsumer extends DefaultConsumer {
        LocalConsumer(Channel channel) {
            super(channel);
            RabbitMQNetworkDriver.this.log("Consumer initialized.");
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public synchronized void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            if (RabbitMQNetworkDriver.this.isConnected()) {
                Object byteArrayToObject = SerializationUtils.byteArrayToObject(bArr);
                if (byteArrayToObject instanceof AdvertiseMessage) {
                    AdvertiseMessage advertiseMessage = (AdvertiseMessage) byteArrayToObject;
                    RabbitMQNetworkDriver.this.handleAdvertiseMessage(new Peer(advertiseMessage.getSender()), advertiseMessage.getQueueName(), advertiseMessage.isReply());
                } else if (byteArrayToObject instanceof MetaMessage) {
                    MetaMessage metaMessage = (MetaMessage) byteArrayToObject;
                    RabbitMQNetworkDriver.this.handleMetaMessage(new Peer(metaMessage.getSender()), metaMessage.getQueueName(), (RawMessage) metaMessage.getEncapsulatedMessage());
                }
            }
        }
    }

    /* loaded from: classes.dex */
    private class PeerTimeoutJob implements Runnable {
        private boolean terminated;

        private PeerTimeoutJob() {
            this.terminated = false;
        }

        /* synthetic */ PeerTimeoutJob(RabbitMQNetworkDriver rabbitMQNetworkDriver, PeerTimeoutJob peerTimeoutJob) {
            this();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            if (this.terminated) {
                return;
            }
            RabbitMQNetworkDriver.this.executor.schedule(this, RabbitMQNetworkDriver.this.peerTimeoutIntervalMS, TimeUnit.MILLISECONDS);
            if (RabbitMQNetworkDriver.this.started.get() && RabbitMQNetworkDriver.this.isConnected()) {
                for (Peer peer : RabbitMQNetworkDriver.this.getPeers()) {
                    Long l = (Long) RabbitMQNetworkDriver.this.lastActivity.get(peer);
                    String str = (String) RabbitMQNetworkDriver.this.queueNamesToPeers.inverse().get(peer);
                    if (l != null && str != null) {
                        if (l.longValue() < System.currentTimeMillis() - (RabbitMQNetworkDriver.this.peerTimeoutIntervalMS * 2)) {
                            RabbitMQNetworkDriver.this.log("PING Timeout for peer " + peer + ". Unbinding it because it is probably disconnected.");
                            RabbitMQNetworkDriver.this.unbind(str);
                        } else if (l.longValue() < System.currentTimeMillis() - RabbitMQNetworkDriver.this.peerTimeoutIntervalMS) {
                            RabbitMQNetworkDriver.this.log("Sending Ping Advertisting for peer " + peer + ". Because it is inactive.");
                            RabbitMQNetworkDriver.this.rawSendToOne(peer, new AdvertiseMessage(RabbitMQNetworkDriver.this.localPeer.getPeerId(), RabbitMQNetworkDriver.this.localQueue, false));
                        }
                    }
                }
            }
        }

        void terminate() {
            this.terminated = true;
        }
    }

    public RabbitMQNetworkDriver(Peer peer, String str, int i, String str2, ScheduledExecutorService scheduledExecutorService) {
        this.localPeer = peer;
        this.port = i;
        this.host = str;
        this.executor = scheduledExecutorService;
        this.networkTag = str2;
        log("INIT " + str + ":" + i + " using network tag: " + str2);
    }

    private synchronized void bind(Peer peer, String str) {
        String str2 = this.queueNamesToPeers.inverse().get(peer);
        Peer peer2 = this.queueNamesToPeers.get(str);
        if (!str.equals(str2) || !peer.equals(peer2)) {
            if (peer.equals(this.localPeer)) {
                log("Fatal: cannot bind myself!");
            } else {
                if (str2 != null && !str2.equals(str)) {
                    log("Receive BIND for the peer " + peer + " on queue " + str + " but it already exists on queue " + str2 + ". Removing existing queue.");
                    unbind(str2);
                }
                log("Received BIND message for peer " + peer + " on queue " + str);
                Peer forcePut = this.queueNamesToPeers.forcePut(str, peer);
                if (forcePut != null) {
                    log("Fatal: existing peer " + forcePut + " was already bound on queue " + str + " !");
                } else {
                    log("BIND completed. Reporting NewPeerEvent " + peer + " to the middleware.");
                    sendEvent(new NewPeerEvent(peer));
                }
            }
        }
    }

    private void checkIsStarted() {
        if (!isStarted()) {
            throw new IllegalStateException("Network not started!");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void handleAdvertiseMessage(Peer peer, String str, boolean z) {
        if (!peer.equals(this.localPeer)) {
            if (z) {
                log("ADVERTISING REPLY from " + peer + " queue: " + str);
            } else {
                log("ADVERTISING REQUEST from " + peer + " queue: " + str);
            }
            bind(peer, str);
            this.lastActivity.put(peer, Long.valueOf(System.currentTimeMillis()));
            if (!z) {
                log("SENDING ADVERTISING to peer " + peer + " regarding local queue: " + this.localQueue);
                rawSendToOne(peer, new AdvertiseMessage(this.localPeer.getPeerId(), this.localQueue, true));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void handleMetaMessage(Peer peer, String str, RawMessage rawMessage) {
        if (!peer.equals(this.localPeer)) {
            if (isBound(peer, str)) {
                log("METAMESSAGE " + rawMessage + " from: " + peer + " on queue " + str + " accepted. Reporting to middleware.");
                this.lastActivity.put(peer, Long.valueOf(System.currentTimeMillis()));
                sendEvent(new MessageReceivedEvent(peer, rawMessage));
            } else {
                log("METAMESSAGE " + rawMessage + " from unknown binding: " + peer + " on queue " + str + ". Message ignored.");
            }
        }
    }

    private synchronized boolean isBound(Peer peer, String str) {
        boolean z;
        String str2 = this.queueNamesToPeers.inverse().get(peer);
        Peer peer2 = this.queueNamesToPeers.get(str);
        if (!peer.equals(this.localPeer) && str.equals(str2)) {
            z = peer.equals(peer2);
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void log(String str) {
        this.logger.debug("[RabbitMQ " + this.localPeer.getPeerId() + "] " + str);
    }

    private synchronized void rawSendToAll(Object obj) {
        if (isConnected()) {
            log("Sending a broadcast message (Message: " + obj + ")");
            try {
                this.channel.basicPublish(this.dataExchangeName, BROADCAST, null, ((obj instanceof MetaMessage) || (obj instanceof AdvertiseMessage)) ? SerializationUtils.objectToByteArray(obj) : SerializationUtils.objectToByteArray(new MetaMessage(this.localPeer.getPeerId(), obj, this.localQueue)));
                log("Broadcast Message (Message: " + obj + ") has been sent.");
            } catch (IOException e) {
                log("Broadcast Message (Message: " + obj + ") has NOT been sent: " + e.getMessage());
            }
        } else {
            log("NOT Sending a broadcast message (Message: " + obj + "): Network not connected!");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void rawSendToOne(Peer peer, Object obj) {
        if (isConnected()) {
            log("Sending a message to " + peer + " (Message: " + obj + ")");
            if (peer.equals(this.localPeer)) {
                log("Cannot send a message to self");
            } else {
                try {
                    this.channel.basicPublish(this.dataExchangeName, peer.getPeerId(), null, ((obj instanceof MetaMessage) || (obj instanceof AdvertiseMessage)) ? SerializationUtils.objectToByteArray(obj) : SerializationUtils.objectToByteArray(new MetaMessage(this.localPeer.getPeerId(), obj, this.localQueue)));
                    log("Message to " + peer + " (Message: " + obj + ") has been sent.");
                } catch (IOException e) {
                    log("Message to " + peer + " (Message: " + obj + ") has NOT been sent: " + e.getMessage());
                }
            }
        } else {
            log("NOT Sending a message to " + peer + " (Message: " + obj + "): Network not connected!");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void unbind(String str) {
        log("Received UNBIND message for queue " + str);
        Peer remove = this.queueNamesToPeers.remove(str);
        this.lastActivity.remove(remove);
        if (remove == null) {
            log("Unable to unbind " + str + ": unknown queue");
        } else {
            log("UNBIND completed. " + remove + " successfully unbound from queue " + str + ". Reporting disconnection to the middleware.");
            sendEvent(new LostPeerEvent(remove));
        }
    }

    public synchronized void connect() throws IOException {
        checkIsStarted();
        log("Trying CONNECT to " + this.host + ":" + this.port);
        if (isConnected()) {
            log("CONNECT WARNING. Client already connected. Disconnecting...");
            disconnect();
        }
        this.queueNamesToPeers.clear();
        this.lastActivity.clear();
        this.dataExchangeName = String.valueOf(this.networkTag) + "-ShAir";
        this.localQueue = String.valueOf(this.localPeer.getPeerId()) + "_" + System.currentTimeMillis();
        log("LOCAL QUEUE is: " + this.localQueue);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(this.host);
        connectionFactory.setPort(this.port);
        this.connection = connectionFactory.newConnection();
        this.channel = this.connection.createChannel();
        this.channel.exchangeDeclare(this.dataExchangeName, "direct");
        this.channel.queueDeclare(this.localQueue, false, false, true, null);
        this.channel.basicQos(0, 1, false);
        this.channel.queueBind(this.localQueue, this.dataExchangeName, this.localPeer.getPeerId());
        this.channel.queueBind(this.localQueue, this.dataExchangeName, BROADCAST);
        this.consumer = new LocalConsumer(this.channel);
        this.channel.basicConsume(this.localQueue, true, this.consumer);
        log("CONNECTION almost completed. Sending an advertise message to all the peers.");
        rawSendToAll(new AdvertiseMessage(this.localPeer.getPeerId(), this.localQueue, false));
        if (isConnected()) {
            log("CONNECT successful");
        } else {
            log("CONNECT fail");
        }
    }

    public synchronized void disconnect() {
        checkIsStarted();
        log("Trying DISCONNECT");
        if (isConnected()) {
            this.logger.warn("DISCONNECTING");
            try {
                this.channel.close();
            } catch (IOException e) {
            }
            try {
                this.connection.close();
            } catch (IOException e2) {
            }
            this.queueNamesToPeers.clear();
            this.lastActivity.clear();
            if (isConnected()) {
                log("DISCONNECT fail");
            } else {
                log("DISCONNECT success");
            }
        } else {
            log("Client already disconnected");
        }
    }

    public String getHost() {
        return this.host;
    }

    @Override // edu.mit.media.ie.shair.middleware.net.NetworkDriver
    public Peer getLocalPeer() {
        return this.localPeer;
    }

    public String getNetworkTag() {
        return this.networkTag;
    }

    public int getPeerTimeoutIntervalMS() {
        return this.peerTimeoutIntervalMS;
    }

    @Override // edu.mit.media.ie.shair.middleware.net.NetworkDriver
    public Collection<Peer> getPeers() {
        checkIsStarted();
        return new HashSet(this.queueNamesToPeers.values());
    }

    public int getPort() {
        return this.port;
    }

    public int getReconnectionIntervalMS() {
        return this.reconnectionIntervalMS;
    }

    public synchronized boolean isConnected() {
        boolean z;
        if (this.started.get() && this.channel != null && this.connection != null && this.channel.isOpen()) {
            z = this.connection.isOpen();
        }
        return z;
    }

    @Override // edu.mit.media.ie.shair.middleware.common.Startable
    public boolean isStarted() {
        return this.started.get();
    }

    @Override // edu.mit.media.ie.shair.middleware.net.NetworkDriver
    public void sendToAll(RawMessage rawMessage) {
        checkIsStarted();
        if (!isConnected()) {
            log("NOT Sending a broadcast message (Message: " + rawMessage + "): Network not connected!");
        } else {
            if (this.queueNamesToPeers.isEmpty()) {
                return;
            }
            rawSendToAll(rawMessage);
            sendEvent(new MessageSentEvent(getPeers(), rawMessage));
        }
    }

    @Override // edu.mit.media.ie.shair.middleware.net.NetworkDriver
    public synchronized void sendToMany(Collection<Peer> collection, RawMessage rawMessage) {
        checkIsStarted();
        if (isConnected()) {
            HashSet hashSet = new HashSet(collection);
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                if (!this.queueNamesToPeers.values().contains((Peer) it.next())) {
                    it.remove();
                }
            }
            if (!hashSet.isEmpty()) {
                sendEvent(new MessageSentEvent(hashSet, rawMessage));
                Iterator it2 = hashSet.iterator();
                while (it2.hasNext()) {
                    rawSendToOne((Peer) it2.next(), rawMessage);
                }
            }
        } else {
            log("NOT Sending a message to " + collection + " (Message: " + rawMessage + "): Network not connected!");
        }
    }

    @Override // edu.mit.media.ie.shair.middleware.net.NetworkDriver
    public synchronized void sendToOne(Peer peer, RawMessage rawMessage) {
        checkIsStarted();
        if (!isConnected()) {
            log("NOT Sending a message to " + peer + " (Message: " + rawMessage + "): Network not connected!");
        } else if (this.queueNamesToPeers.values().contains(peer)) {
            rawSendToOne(peer, rawMessage);
            sendEvent(new MessageSentEvent(peer, rawMessage));
        }
    }

    public void setHost(String str) {
        this.host = str;
    }

    public void setNetworkTag(String str) {
        this.networkTag = str;
    }

    public void setPeerTimeoutIntervalMS(int i) {
        if (i < 1) {
            throw new IllegalArgumentException("peerTimeoutIntervalMS must be positive");
        }
        this.peerTimeoutIntervalMS = i;
    }

    public void setPort(int i) {
        if (i < 1 || i > 65535) {
            throw new IllegalArgumentException("Port must be between 1 and 65535");
        }
        this.port = i;
    }

    public synchronized void setReconnectionIntervalMS(int i) {
        if (i < 0) {
            throw new IllegalArgumentException("reconnectionIntervalMS must be non-negative");
        }
        if (isStarted() && i == 0 && this.connectionJob != null) {
            this.connectionJob.terminate();
            this.connectionJob = null;
        }
        this.reconnectionIntervalMS = i;
        if (isStarted() && i > 0 && this.connectionJob == null) {
            this.connectionJob = new ConnectionJob(this, null);
            this.executor.submit(this.connectionJob);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // edu.mit.media.ie.shair.middleware.common.Startable
    public void start() {
        PeerTimeoutJob peerTimeoutJob = null;
        Object[] objArr = 0;
        if (!this.started.compareAndSet(false, true)) {
            throw new IllegalStateException("Network driver already started");
        }
        sendEvent(new NetworkStartedEvent());
        this.peerTimeoutJob = new PeerTimeoutJob(this, peerTimeoutJob);
        if (this.reconnectionIntervalMS > 0) {
            this.connectionJob = new ConnectionJob(this, objArr == true ? 1 : 0);
            this.executor.submit(this.connectionJob);
        }
        this.executor.submit(this.peerTimeoutJob);
    }

    @Override // edu.mit.media.ie.shair.middleware.common.Startable
    public void stop() {
        if (isConnected()) {
            disconnect();
        }
        if (!this.started.compareAndSet(true, false)) {
            throw new IllegalStateException("Network driver already stopped");
        }
        if (this.connectionJob != null) {
            this.connectionJob.terminate();
        }
        this.peerTimeoutJob.terminate();
        this.connectionJob = null;
        this.peerTimeoutJob = null;
        sendEvent(new NetworkStoppedEvent());
    }
}
