package lia.util.net.copy.transport;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.security.auth.Subject;
import lia.gsi.GSIServer;
import lia.gsi.net.GSIGssSocketFactory;
import lia.util.net.common.AbstractFDTCloseable;
import lia.util.net.common.Config;
import lia.util.net.common.DirectByteBufferPool;
import lia.util.net.common.FDTVersion;
import lia.util.net.common.Utils;

/* loaded from: input_file:lia/util/net/copy/transport/ControlChannel.class */
public class ControlChannel extends AbstractFDTCloseable implements Runnable {
    private static final Logger logger = Logger.getLogger(ControlChannel.class.getName());
    private static final CtrlMsg versionMsg = new CtrlMsg(1, "0.24.0-2015-12-04");
    private static final Config config = Config.getInstance();
    public static final int CONNECT_TIMEOUT = 20000;
    public static final int SOCKET_TIMEOUT = 60000;
    private final Socket controlSocket;
    private final ConcurrentLinkedQueue<Object> qToSend;
    private final AtomicBoolean cleanupFinished;
    private UUID fdtSessionID;
    private volatile ObjectOutputStream oos;
    private volatile ObjectInputStream ois;
    private final ControlChannelNotifier notifier;
    private volatile String fullRemoteVersion;
    public Map<String, Object> remoteConf;
    public final InetAddress remoteAddress;
    public final int remotePort;
    public final int localPort;
    public volatile Subject subject;
    private volatile String myName;
    private volatile ScheduledFuture<?> ccptFuture;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lia/util/net/copy/transport/ControlChannel$ControlChannelPingerTask.class */
    public static final class ControlChannelPingerTask implements Runnable {
        public static final CtrlMsg pingMsg = new CtrlMsg(0, new byte[1]);
        private final ControlChannel cc;

        ControlChannelPingerTask(ControlChannel controlChannel) {
            this.cc = controlChannel;
            ControlChannel.logger.log(Level.INFO, "[ ControlChannelPingerTask ] initialized");
        }

        @Override // java.lang.Runnable
        public void run() {
            if (ControlChannel.logger.isLoggable(Level.FINER)) {
                ControlChannel.logger.log(Level.FINER, "[ ControlChannelPingerTask ] sending KEEP_ALIVE_MSG");
            }
            try {
                this.cc.sendCtrlMessage(pingMsg);
            } catch (Throwable th) {
                ControlChannel.logger.log(Level.WARNING, " [ ContrlChannelPingerTask ] Unable to send msg  ... Close the socket ??? This should not happen", th);
            }
        }
    }

    public ControlChannel(String str, int i, UUID uuid, ControlChannelNotifier controlChannelNotifier) throws Exception {
        this(InetAddress.getByName(str), i, uuid, controlChannelNotifier);
    }

    public ControlChannel(InetAddress inetAddress, int i, UUID uuid, ControlChannelNotifier controlChannelNotifier) throws Exception {
        this.qToSend = new ConcurrentLinkedQueue<>();
        this.cleanupFinished = new AtomicBoolean(false);
        this.oos = null;
        this.ois = null;
        try {
            this.notifier = controlChannelNotifier;
            this.fdtSessionID = uuid;
            if (config.isGSIModeEnabled()) {
                this.controlSocket = new GSIGssSocketFactory().createSocket(inetAddress, config.getGSIPort(), false, false);
                this.subject = GSIGssSocketFactory.getLocalSubject(this.controlSocket);
            } else {
                this.controlSocket = new Socket();
                this.controlSocket.connect(new InetSocketAddress(inetAddress, i), 20000);
            }
            this.remoteAddress = inetAddress;
            this.remotePort = i;
            this.localPort = this.controlSocket.getLocalPort();
            this.controlSocket.setTcpNoDelay(true);
            if (!config.isGSIModeEnabled()) {
                this.controlSocket.getOutputStream().write(new byte[]{0});
            }
            initStreams();
            this.controlSocket.setSoTimeout(1000);
        } catch (Throwable th) {
            close("Cannot instantiate ControlChannel", th);
            throw new Exception(th);
        }
    }

    public boolean isSocketClosed() {
        if (this.controlSocket == null) {
            return true;
        }
        return this.controlSocket.isClosed();
    }

    public ControlChannel(Socket socket, ControlChannelNotifier controlChannelNotifier) throws Exception {
        this.qToSend = new ConcurrentLinkedQueue<>();
        this.cleanupFinished = new AtomicBoolean(false);
        this.oos = null;
        this.ois = null;
        try {
            this.controlSocket = socket;
            this.remoteAddress = socket.getInetAddress();
            this.remotePort = socket.getPort();
            this.localPort = socket.getLocalPort();
            this.notifier = controlChannelNotifier;
            initStreams();
            this.controlSocket.setSoTimeout(1000);
        } catch (Throwable th) {
            close("Cannot instantiate ControlChannel", th);
            throw new Exception(th);
        }
    }

    public ControlChannel(GSIServer gSIServer, Socket socket, Subject subject, ControlChannelNotifier controlChannelNotifier) throws Exception {
        this.qToSend = new ConcurrentLinkedQueue<>();
        this.cleanupFinished = new AtomicBoolean(false);
        this.oos = null;
        this.ois = null;
        try {
            this.controlSocket = socket;
            this.subject = subject;
            this.remoteAddress = socket.getInetAddress();
            this.remotePort = socket.getPort();
            this.localPort = socket.getLocalPort();
            this.notifier = controlChannelNotifier;
            initStreams();
            this.controlSocket.setSoTimeout(1000);
        } catch (Throwable th) {
            close("Cannot instantiate ControlChannel", th);
            throw new Exception(th);
        }
    }

    public UUID fdtSessionID() {
        return this.fdtSessionID;
    }

    public String toString() {
        return this.controlSocket == null ? "null" : this.controlSocket.toString();
    }

    private void initStreams() throws Exception {
        this.oos = new ObjectOutputStream(new BufferedOutputStream(this.controlSocket.getOutputStream()));
        sendMsgImpl(versionMsg);
        this.ois = new ObjectInputStream(new BufferedInputStream(this.controlSocket.getInputStream()));
        CtrlMsg ctrlMsg = (CtrlMsg) this.ois.readObject();
        if (ctrlMsg.tag != 1) {
            throw new FDTProcolException("Unexpected remote control message. Expected PROTOCOL_VERSION tag [ 1 ] Received tag: " + ctrlMsg.tag);
        }
        this.fullRemoteVersion = (String) ctrlMsg.message;
        sendMsgImpl(new CtrlMsg(4, Config.getInstance().getConfigMap()));
        CtrlMsg ctrlMsg2 = (CtrlMsg) this.ois.readObject();
        if (ctrlMsg2.tag != 4) {
            throw new FDTProcolException("Unexpected remote control message. Expected INIT_FDT_CONF tag [ 4 ] Received tag: " + ctrlMsg2.tag);
        }
        this.remoteConf = (HashMap) ctrlMsg2.message;
        try {
            if (DirectByteBufferPool.initInstance(Integer.parseInt((String) this.remoteConf.get("-bs")), config.getMaxTakePollIter())) {
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "The buffer pool has been initialized");
                }
            } else if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "The buffer pool is already initialized");
            }
            if (this.fdtSessionID == null) {
                CtrlMsg ctrlMsg3 = (CtrlMsg) this.ois.readObject();
                if (ctrlMsg3.tag != 2) {
                    throw new FDTProcolException("Unexpected remote control message. Expected SESSION_ID tag [ 2 ] Received tag: " + ctrlMsg3.tag);
                }
                this.fdtSessionID = (UUID) ctrlMsg3.message;
            } else {
                sendMsgImpl(new CtrlMsg(2, this.fdtSessionID));
            }
            this.myName = " ControlThread for ( " + this.fdtSessionID + " ) " + this.controlSocket.getInetAddress() + ":" + this.controlSocket.getPort();
            logger.log(Level.INFO, "NEW CONTROL stream for " + this.fdtSessionID + " initialized ");
            long keepAliveDelay = Config.getInstance().getKeepAliveDelay(TimeUnit.NANOSECONDS);
            String str = (String) this.remoteConf.get("-ka");
            long nanos = str == null ? keepAliveDelay : TimeUnit.SECONDS.toNanos(Long.parseLong(str));
            long j = nanos < 0 ? keepAliveDelay : nanos;
            if (this.fullRemoteVersion == null || Utils.compareVersions(this.fullRemoteVersion, "0.9.8") < 0) {
                if (logger.isLoggable(Level.FINE)) {
                    logger.log(Level.FINE, "[ ControlChannel ] remote version " + this.fullRemoteVersion + " does not support KEEP_ALIVE messages");
                    return;
                }
                return;
            }
            synchronized (this.closeLock) {
                FDTVersion fromVersionString = FDTVersion.fromVersionString("0.24.0-2015-12-04");
                FDTVersion fromVersionString2 = FDTVersion.fromVersionString(this.fullRemoteVersion);
                long min = Math.min(keepAliveDelay, j);
                long seconds = TimeUnit.NANOSECONDS.toSeconds(min);
                logger.log(Level.INFO, "App KeepAlive [ " + (seconds > 0 ? seconds + " second(s)" : TimeUnit.NANOSECONDS.toMillis(min) + " millis") + " ] enabled for control channel. Local " + fromVersionString + ", Remote " + fromVersionString2);
                this.ccptFuture = Utils.getMonitoringExecService().scheduleWithFixedDelay(new ControlChannelPingerTask(this), min, min, TimeUnit.NANOSECONDS);
            }
        } catch (Throwable th) {
            throw new FDTProcolException("Unable to instantiate the buffer pool", th);
        }
    }

    public String remoteVersion() {
        return this.fullRemoteVersion;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void cleanup() {
        if (this.cleanupFinished.compareAndSet(false, true)) {
            Utils.cancelFutureIgnoringException(this.ccptFuture, false);
            Utils.closeIgnoringExceptions(this.ois);
            Utils.closeIgnoringExceptions(this.oos);
            Utils.closeIgnoringExceptions(this.controlSocket);
            if (this.notifier != null) {
                try {
                    this.notifier.notifyCtrlSessionDown(this, downCause());
                } catch (Throwable th) {
                }
            }
        }
    }

    public void sendCtrlMessage(Object obj) {
        if (obj == null) {
            throw new NullPointerException("Control message cannot be null over the ControlChannel");
        }
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "[ CtrlChannel ] adding to send queue msg: " + obj.toString());
            if (logger.isLoggable(Level.FINEST)) {
                Thread.dumpStack();
            }
        }
        this.qToSend.add(obj);
    }

    private void sendAllMsgs() throws Exception {
        while (true) {
            Object poll = this.qToSend.poll();
            if (poll == null) {
                return;
            } else {
                sendMsgImpl(poll);
            }
        }
    }

    private void sendMsgImpl(Object obj) throws Exception {
        boolean isClosed;
        IOException iOException;
        try {
            if (logger.isLoggable(Level.FINE)) {
                logger.log(Level.FINE, " [ ControlChannel ] sending message " + obj);
            }
            this.oos.writeObject(obj);
            this.oos.reset();
            this.oos.flush();
        } finally {
            if (!isClosed) {
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10);
        Thread thread = new Thread() { // from class: lia.util.net.copy.transport.ControlChannel.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                setName("INotifier for: " + ControlChannel.this.myName);
                while (ControlChannel.this.controlSocket != null && !ControlChannel.this.controlSocket.isClosed()) {
                    try {
                        Object poll = arrayBlockingQueue.poll(1L, TimeUnit.SECONDS);
                        if (poll != null) {
                            if (ControlChannel.logger.isLoggable(Level.FINEST)) {
                                ControlChannel.logger.log(Level.FINEST, "[ ControlChannel ] [ INotifier ] notifying msg: " + poll);
                            }
                            ControlChannel.this.notifier.notifyCtrlMsg(ControlChannel.this, poll);
                        }
                    } catch (Throwable th) {
                        if (ControlChannel.logger.isLoggable(Level.FINER)) {
                            StringBuilder sb = new StringBuilder();
                            sb.append("[ ControlChannel ] [ INotifier ] Got exception. ControlChannel isClosed(): ").append(ControlChannel.this.isClosed());
                            if (ControlChannel.this.isClosed()) {
                                sb.append(" downMessage: ").append(ControlChannel.this.downMessage()).append(" downCause: ").append(Utils.getStackTrace(ControlChannel.this.downCause()));
                            }
                            sb.append(" Inotifier Exception: ");
                            ControlChannel.logger.log(Level.FINER, sb.toString(), th);
                        }
                        ControlChannel.this.close("INotifier got exception ", th);
                        ControlChannel.this.cleanup();
                    }
                }
            }
        };
        thread.setDaemon(true);
        thread.start();
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, this.myName + " STARTED main loop");
        }
        String str = null;
        Throwable th = null;
        while (this.controlSocket != null && !this.controlSocket.isClosed()) {
            try {
                try {
                    try {
                        sendAllMsgs();
                        Object readObject = this.ois.readObject();
                        if (readObject != null) {
                            boolean isLoggable = logger.isLoggable(Level.FINE);
                            if (isLoggable) {
                                logger.log(Level.FINE, " [ ControlChannel ] received msg: " + readObject);
                            }
                            if (readObject instanceof CtrlMsg) {
                                CtrlMsg ctrlMsg = (CtrlMsg) readObject;
                                if (ctrlMsg.tag == 0) {
                                    if (isLoggable) {
                                        logger.log(Level.FINE, "Ctrl channel received app KEEP_ALIVE_MSG");
                                    }
                                } else if (ctrlMsg.tag == 12) {
                                    if (!isClosed()) {
                                        logger.log(Level.WARNING, "Remote site will close the transfer session; FINAL timeout was reached. Most likely the TCP buffers on remote site are higher than normal. Try the blocking I/O -bio on both sides and no -ss.");
                                        close("Remote site will close the transfer session; FINAL timeout was reached. Most likely the TCP buffers on remote site are higher than normal. Try the blocking I/O -bio on both sides and no -ss.", null);
                                    }
                                    break;
                                }
                            }
                            arrayBlockingQueue.add(readObject);
                        }
                    } catch (SocketTimeoutException e) {
                    } catch (IOException e2) {
                        close("Control channel got I/O Exception", e2);
                        cleanup();
                    } catch (Throwable th2) {
                        th2.printStackTrace();
                        close("Control channel got general exception. Will close!", th2);
                        cleanup();
                    }
                } catch (Throwable th3) {
                    if (!isClosed()) {
                        str = this.myName + " got exception in main loop: " + th3.getMessage();
                        th = th3;
                        if (logger.isLoggable(Level.FINER)) {
                            logger.log(Level.FINER, "Control Thread for " + this.myName + " got exception in main loop", th3);
                        }
                    }
                    if (downMessage() == null && downCause() == null) {
                        close(str, th);
                    } else {
                        close(downMessage(), downCause());
                    }
                }
            } catch (Throwable th4) {
                if (downMessage() == null && downCause() == null) {
                    close(null, null);
                } else {
                    close(downMessage(), downCause());
                }
                throw th4;
            }
        }
        if (downMessage() == null && downCause() == null) {
            close(null, null);
        } else {
            close(downMessage(), downCause());
        }
        logger.log(Level.INFO, this.myName + " FINISHED");
    }

    @Override // lia.util.net.common.AbstractFDTCloseable
    protected void internalClose() {
        try {
            Thread thread = new Thread() { // from class: lia.util.net.copy.transport.ControlChannel.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    setName("(ML) ControlChannel Graceful stopper thread");
                    int i = 0;
                    while (true) {
                        try {
                            int i2 = i;
                            i++;
                            if (i2 >= 3) {
                                break;
                            }
                            try {
                                Thread.sleep(1000L);
                            } catch (Throwable th) {
                            }
                            if (ControlChannel.this.controlSocket == null || ControlChannel.this.controlSocket.isClosed()) {
                                break;
                            } else {
                                ControlChannel.this.qToSend.add(new CtrlMsg(12, ControlChannel.this.downMessage() + Utils.getStackTrace(ControlChannel.this.downCause())));
                            }
                        } finally {
                            ControlChannel.this.cleanup();
                        }
                    }
                }
            };
            thread.setDaemon(true);
            thread.start();
        } catch (Throwable th) {
            try {
                cleanup();
            } catch (Throwable th2) {
                logger.log(Level.WARNING, "Exception in cleanup()", th2);
            }
        }
    }
}
