/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.performanceanalyzer.rca.net;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.AppContext;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.grpc.FlowUnitMessage;
import org.opensearch.performanceanalyzer.net.NetClient;
import org.opensearch.performanceanalyzer.rca.framework.core.Node;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import org.opensearch.performanceanalyzer.rca.messages.DataMsg;
import org.opensearch.performanceanalyzer.rca.messages.IntentMsg;
import org.opensearch.performanceanalyzer.rca.messages.UnicastIntentMsg;
import org.opensearch.performanceanalyzer.rca.net.NodeStateManager;
import org.opensearch.performanceanalyzer.rca.net.ReceivedFlowUnitStore;
import org.opensearch.performanceanalyzer.rca.net.SubscriptionManager;
import org.opensearch.performanceanalyzer.rca.net.tasks.BroadcastSubscriptionTxTask;
import org.opensearch.performanceanalyzer.rca.net.tasks.FlowUnitTxTask;
import org.opensearch.performanceanalyzer.rca.net.tasks.UnicastSubscriptionTxTask;
import org.opensearch.performanceanalyzer.rca.util.ClusterUtils;

public class WireHopper {
    private static final Logger LOG = LogManager.getLogger(WireHopper.class);
    private static final int MS_IN_S = 1000;
    private final NetClient netClient;
    private final SubscriptionManager subscriptionManager;
    private final NodeStateManager nodeStateManager;
    private final AtomicReference<ExecutorService> executorReference;
    private final ReceivedFlowUnitStore receivedFlowUnitStore;
    private final AppContext appContext;

    public WireHopper(NodeStateManager nodeStateManager, NetClient netClient, SubscriptionManager subscriptionManager, AtomicReference<ExecutorService> executorReference, ReceivedFlowUnitStore receivedFlowUnitStore, AppContext appContext) {
        this.netClient = netClient;
        this.subscriptionManager = subscriptionManager;
        this.nodeStateManager = nodeStateManager;
        this.executorReference = executorReference;
        this.receivedFlowUnitStore = receivedFlowUnitStore;
        this.appContext = appContext;
    }

    public void sendIntent(IntentMsg msg) {
        ExecutorService executor = this.executorReference.get();
        if (executor != null) {
            try {
                executor.execute(new BroadcastSubscriptionTxTask(this.netClient, msg, this.subscriptionManager, this.nodeStateManager, this.appContext));
            }
            catch (RejectedExecutionException ree) {
                LOG.warn("Dropped sending subscription because the threadpool queue is full");
                StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_THREADPOOL_QUEUE_FULL_ERROR);
            }
        }
    }

    public void sendData(DataMsg msg) {
        ExecutorService executor = this.executorReference.get();
        if (executor != null) {
            try {
                executor.execute(new FlowUnitTxTask(this.netClient, this.subscriptionManager, msg, this.appContext));
            }
            catch (RejectedExecutionException ree) {
                LOG.warn("Dropped sending flow unit because the threadpool queue is full");
                StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_THREADPOOL_QUEUE_FULL_ERROR);
            }
        }
    }

    @VisibleForTesting
    public AppContext getAppContext() {
        return this.appContext;
    }

    public List<FlowUnitMessage> readFromWire(Node<?> node) {
        String nodeName = node.name();
        long intervalInSeconds = node.getEvaluationIntervalSeconds();
        ImmutableList<FlowUnitMessage> remoteFlowUnits = this.receivedFlowUnitStore.drainNode(nodeName);
        Set<InstanceDetails.Id> publisherSet = this.subscriptionManager.getPublishersForNode(nodeName);
        for (InstanceDetails.Id publisher : publisherSet) {
            if (ClusterUtils.isHostIdInCluster(publisher, this.appContext.getAllClusterInstances())) continue;
            this.subscriptionManager.unsubscribeAndTerminateConnection(nodeName, publisher);
        }
        ImmutableList<InstanceDetails> hostsToSubscribeTo = this.nodeStateManager.getStaleOrNotSubscribedNodes(nodeName, 2L * intervalInSeconds * 1000L, publisherSet);
        for (InstanceDetails instance : hostsToSubscribeTo) {
            ExecutorService executor = this.executorReference.get();
            if (executor == null) continue;
            try {
                executor.execute(new UnicastSubscriptionTxTask(this.netClient, new UnicastIntentMsg("", nodeName, node.getTags(), instance), this.subscriptionManager, this.nodeStateManager, this.appContext));
            }
            catch (RejectedExecutionException ree) {
                LOG.warn("Dropped sending subscription request because the threadpool queue is full");
                StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_THREADPOOL_QUEUE_FULL_ERROR);
            }
        }
        return remoteFlowUnits;
    }

    @VisibleForTesting
    public void shutdownAll() {
        this.executorReference.get().shutdown();
        this.netClient.stop();
        this.netClient.getConnectionManager().shutdown();
    }

    @VisibleForTesting
    public SubscriptionManager getSubscriptionManager() {
        return this.subscriptionManager;
    }

    @VisibleForTesting
    public NodeStateManager getNodeStateManager() {
        return this.nodeStateManager;
    }

    @VisibleForTesting
    public AtomicReference<ExecutorService> getExecutorReference() {
        return this.executorReference;
    }

    @VisibleForTesting
    public ReceivedFlowUnitStore getReceivedFlowUnitStore() {
        return this.receivedFlowUnitStore;
    }
}

