package rice.pastry.wire;

import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import rice.pastry.Id;
import rice.pastry.NodeId;
import rice.pastry.PastryNode;
import rice.pastry.dist.DistCoalesedNodeHandle;
import rice.pastry.messaging.Message;
import rice.pastry.routing.RouteMessage;
import rice.pastry.wire.exception.DeserializationException;
import rice.pastry.wire.exception.ImproperlyFormattedMessageException;
import rice.pastry.wire.exception.NodeIsDeadException;
import rice.pastry.wire.messaging.datagram.PingMessage;
import rice.pastry.wire.messaging.socket.DisconnectMessage;
import rice.pastry.wire.messaging.socket.HelloMessage;
import rice.pastry.wire.messaging.socket.HelloResponseMessage;
import rice.pastry.wire.messaging.socket.SocketCommandMessage;
import rice.pastry.wire.messaging.socket.SocketTransportMessage;
import rice.pastry.wire.testing.WireFileProcessor;

/* loaded from: input_file:rice/pastry/wire/WireNodeHandle.class */
public class WireNodeHandle extends DistCoalesedNodeHandle implements SelectionKeyHandler {
    private transient long lastpingtime;
    private transient SocketChannelReader reader;
    private transient SocketChannelWriter writer;
    private transient SelectionKey key;
    private transient int state;
    public static final int STATE_USING_UDP = -1;
    public static final int STATE_USING_TCP = -2;
    public static final int STATE_USING_UDP_WAITING_FOR_TCP_DISCONNECT = -3;
    public static final int STATE_USING_UDP_WAITING_TO_DISCONNECT = -4;
    public static int MAX_UDP_MESSAGE_SIZE = DatagramManager.DATAGRAM_SEND_BUFFER_SIZE;
    public static int SOCKET_BUFFER_SIZE = 32768;
    public static int PING_THROTTLE = 5;
    transient int writeCtr;
    transient long writeTmr;
    transient PrintStream outputStream;

    public WireNodeHandle(InetSocketAddress inetSocketAddress, NodeId nodeId) {
        super(nodeId, inetSocketAddress);
        this.writeCtr = 0;
        this.writeTmr = 0L;
        this.outputStream = null;
        debug(new StringBuffer().append("creating Socket handle for node: ").append(nodeId).append(" address: ").append(inetSocketAddress).toString());
        this.lastpingtime = 0L;
        setState(-1, "ctor");
    }

    public WireNodeHandle(InetSocketAddress inetSocketAddress, NodeId nodeId, PastryNode pastryNode) {
        super(nodeId, inetSocketAddress);
        this.writeCtr = 0;
        this.writeTmr = 0L;
        this.outputStream = null;
        debug(new StringBuffer().append("creating Socket handle for node: ").append(nodeId).append(", local: ").append(pastryNode).append(" address: ").append(inetSocketAddress).toString());
        this.lastpingtime = 0L;
        setState(-1, "ctor");
        setLocalNode(pastryNode);
    }

    public int getState() {
        return this.state;
    }

    public boolean isWriteEnabled() {
        return (this.key.interestOps() & 4) != 0;
    }

    public synchronized void setKey(SelectionKey selectionKey, SocketCommandMessage socketCommandMessage) {
        debug(new StringBuffer().append("Got new key  (state == ").append(this.state).append(")").toString());
        if (this.state == -1) {
            this.key = selectionKey;
            selectionKey.attach(this);
            ((WirePastryNode) getLocalNode()).getSocketManager().openSocket(this);
            this.reader = new SocketChannelReader((WirePastryNode) getLocalNode(), this);
            this.writer = new SocketChannelWriter((WirePastryNode) getLocalNode(), socketCommandMessage, selectionKey, this);
            setState(-2, new StringBuffer().append("setKey1:").append(System.currentTimeMillis()).append(":").append(socketCommandMessage).append(",").append(selectionKey.interestOps()).append(",").append(selectionKey.channel()).toString());
            return;
        }
        markAlive();
        InetSocketAddress address = ((WireNodeHandle) getLocalNode().getLocalHandle()).getAddress();
        InetSocketAddress address2 = getAddress();
        debug(new StringBuffer().append("Found double socket... (state == ").append(this.state).append(")").toString());
        if (this.state != -2) {
            ((WirePastryNode) getLocalNode()).getSocketManager().openSocket(this);
        }
        if (getAddress(address.getAddress()) <= getAddress(address2.getAddress()) && (getAddress(address.getAddress()) != getAddress(address2.getAddress()) || address.getPort() <= address2.getPort())) {
            wireDebug(new StringBuffer().append("setKey2b:").append(socketCommandMessage).toString());
            selectionKey.attach(new StaleSKH());
            debug("Using our socket, letting other socket die...");
            return;
        }
        try {
            this.key.channel().close();
            this.key.cancel();
            this.key.attach(null);
            this.writer.reset(socketCommandMessage);
        } catch (IOException e) {
            System.out.println(new StringBuffer().append("ERROR closing unnecessary socket: ").append(e).toString());
        }
        this.writer.setKey(selectionKey);
        this.key = selectionKey;
        setState(-2, new StringBuffer().append("setKey2a:").append(socketCommandMessage).toString());
        selectionKey.attach(this);
        debug("Killing our socket, using new one...");
    }

    public void receiveSocketMessage(SocketCommandMessage socketCommandMessage) {
        if (socketCommandMessage instanceof DisconnectMessage) {
            debug(new StringBuffer().append("Received DisconnectMessage (state == ").append(this.state).append(")").toString());
            if (this.state == -2) {
                setState(-4, "recSockMsg");
                ((WirePastryNode) getLocalNode()).getSocketManager().closeSocket(this);
                return;
            } else if (this.state == -3) {
                close(null);
                return;
            } else {
                System.out.println(new StringBuffer().append("Recieved DisconnectMessage at non-connected socket - not fatal... (state == ").append(this.state).append(")").toString());
                return;
            }
        }
        if (!(socketCommandMessage instanceof HelloResponseMessage)) {
            System.out.println(new StringBuffer().append("Received unreconginzed SocketCommandMessage ").append(socketCommandMessage).append(" - dropping on floor").toString());
            return;
        }
        HelloResponseMessage helloResponseMessage = (HelloResponseMessage) socketCommandMessage;
        if (helloResponseMessage.getNodeId().equals((Id) getNodeId()) && helloResponseMessage.getDestination().equals((Id) getLocalNode().getNodeId())) {
            markAlive();
            this.writer.greetingReceived();
        } else {
            debug(new StringBuffer().append("Receved incorrect HelloMessageResponse for nodeId ").append(helloResponseMessage.getNodeId()).append(" - resetting.").toString());
            close(null);
        }
    }

    public void notifyKilled() {
        if (this.writer != null) {
            this.writer.notifyKilled();
        }
    }

    @Override // rice.pastry.dist.DistCoalesedNodeHandle
    public void receiveMessageImpl(Message message) {
        Iterator queue;
        assertLocalNode();
        WirePastryNode wirePastryNode = (WirePastryNode) getLocalNode();
        if (this.isLocal) {
            debug(new StringBuffer().append("Sending message ").append(message).append(" locally").toString());
            wirePastryNode.receiveMessage(message);
            return;
        }
        debug(new StringBuffer().append("Passing message ").append(message).append(" to the socket controller for writing (state == ").append(this.state).append(")").toString());
        wireDebug(new StringBuffer().append(WireFileProcessor.enquePrefix).append(message).toString());
        String str = null;
        if (this.writer != null) {
            str = new StringBuffer().append("").append(this.writer.queueSize()).toString();
        }
        try {
            String str2 = null;
            if (this.key != null) {
                str2 = new StringBuffer().append("").append(this.key.interestOps()).toString();
            }
            wireDebug(new StringBuffer().append("DBG:").append(str).append(",").append(str2).toString());
        } catch (CancelledKeyException e) {
            if (!((WirePastryNode) getLocalNode()).getSelectorManager().isAlive()) {
                SocketChannelWriter socketChannelWriter = this.writer;
                if (this.writer != null && (queue = this.writer.getQueue()) != null) {
                    notifyPotentiallyLostMessage(queue);
                }
                throw new NodeIsDeadException(e);
            }
            closeDueToError();
        }
        switch (this.state) {
            case -2:
                this.writer.enqueue(new SocketTransportMessage(message, this.nodeId));
                return;
            case -1:
                try {
                    if (messageSize(message) <= MAX_UDP_MESSAGE_SIZE) {
                        debug("Message is small enough to go over UDP - sending.");
                        ((WirePastryNode) getLocalNode()).getDatagramManager().write(this.nodeId, this.address, message);
                    } else {
                        debug("Message is too large - open up socket!");
                        LinkedList linkedList = new LinkedList();
                        linkedList.addFirst(new SocketTransportMessage(message, this.nodeId));
                        connectToRemoteNode(linkedList.iterator());
                    }
                    return;
                } catch (IOException e2) {
                    System.out.println(new StringBuffer().append("IOException serializing message ").append(message).append(" - cancelling message.").toString());
                    e2.printStackTrace();
                    return;
                }
            default:
                ((WirePastryNode) getLocalNode()).getDatagramManager().write(this.nodeId, this.address, message);
                return;
        }
    }

    public void notifyPotentiallyLostMessage(Iterator it) {
        while (it.hasNext()) {
            System.err.println(new StringBuffer().append("WNH: Potentially lost the message:").append(it.next()).toString());
        }
    }

    public void connectToRemoteNode(Iterator it) {
        if (this.state != -1) {
            if (it != null) {
                while (it.hasNext()) {
                    Object next = it.next();
                    debug(new StringBuffer().append("Enqueueing message ").append(next).append(" into socket channel writer.").toString());
                    this.writer.enqueue(next);
                }
                return;
            }
            return;
        }
        try {
            SocketChannel open = SocketChannel.open();
            open.socket().setSendBufferSize(SOCKET_BUFFER_SIZE);
            open.socket().setReceiveBufferSize(SOCKET_BUFFER_SIZE);
            open.configureBlocking(false);
            boolean connect = open.connect(this.address);
            InetSocketAddress inetSocketAddress = (InetSocketAddress) open.socket().getLocalSocketAddress();
            wireDebug(new StringBuffer().append("DBG:Opening socket to ").append(this.address).append(" from:").append(inetSocketAddress).toString());
            debug(new StringBuffer().append("Opening socket to ").append(this.address).append(" from:").append(inetSocketAddress).toString());
            SelectorManager selectorManager = ((WirePastryNode) getLocalNode()).getSelectorManager();
            Selector selector = selectorManager.getSelector();
            synchronized (selector) {
                if (connect) {
                    this.key = open.register(selector, 1);
                } else {
                    try {
                        this.key = open.register(selector, 9);
                    } catch (NullPointerException e) {
                        if (selectorManager.isAlive()) {
                            if (it != null) {
                                notifyPotentiallyLostMessage(it);
                            }
                            throw e;
                        }
                        if (it != null) {
                            notifyPotentiallyLostMessage(it);
                        }
                        throw new NodeIsDeadException(e);
                    }
                }
            }
            setKey(this.key, new HelloMessage((WirePastryNode) getLocalNode(), this.nodeId));
            if (it != null) {
                while (it.hasNext()) {
                    Object next2 = it.next();
                    debug(new StringBuffer().append("Enqueueing message ").append(next2).append(" into socket channel writer.").toString());
                    this.writer.enqueue(next2);
                }
            }
        } catch (IOException e2) {
            debug(new StringBuffer().append("IOException connecting to remote node ").append(this.address).toString());
            e2.printStackTrace();
            System.out.println(new StringBuffer().append("WireNodeHandle.connectToRemoteNode(): IOException connecting to remote node ").append(this.address).toString());
            setState(-2, "connToRemoteNode, exception");
            close(it);
        }
    }

    public synchronized void disconnect() {
        debug(new StringBuffer().append("Received disconnect request... (state == ").append(this.state).append(")").toString());
        if (this.state != -2) {
            System.out.println(new StringBuffer().append("Recieved disconnect request at non-connected socket - very bad... (state == ").append(this.state).append(")").toString());
        } else {
            setState(-3, "disconnect()");
            this.writer.enqueue(new DisconnectMessage());
        }
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void accept(SelectionKey selectionKey) {
        System.out.println("PANIC: accept() called on WireNodeHandle!");
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void connect(SelectionKey selectionKey) {
        wireDebug("DBG:connect()");
        try {
            if (((SocketChannel) selectionKey.channel()).finishConnect()) {
                synchronized (((WirePastryNode) getLocalNode()).getSelectorManager().getSelector()) {
                    selectionKey.interestOps(selectionKey.interestOps() & (-9));
                }
            }
            debug(new StringBuffer().append("Found connectable channel - completed connection to ").append(this.address).toString());
        } catch (ConnectException e) {
            debug(new StringBuffer().append("ERROR connecting - cancelling. ").append(e).toString());
            close(this.writer.getQueue());
        } catch (SocketException e2) {
            debug(new StringBuffer().append("ERROR connecting - cancelling. ").append(e2).toString());
            close(this.writer.getQueue());
        } catch (IOException e3) {
            debug(new StringBuffer().append("ERROR connecting - cancelling. ").append(e3).toString());
            close(this.writer.getQueue());
        }
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void write(SelectionKey selectionKey) {
        this.writeCtr++;
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.writeTmr > 1000) {
            this.writeTmr = currentTimeMillis;
            wireDebug(new StringBuffer().append("DBG:write():").append(this.writeCtr).append(":").append(selectionKey.interestOps()).toString());
        }
        if (this.state == -2) {
            ((WirePastryNode) getLocalNode()).getSocketManager().update(this);
        }
        try {
            if (this.writer.write((SocketChannel) selectionKey.channel()) && this.state == -4) {
                close(null);
            }
        } catch (IOException e) {
            debug(new StringBuffer().append("ERROR writing - cancelling. ").append(e).toString());
            close(this.writer.getQueue());
        } catch (NullPointerException e2) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void wireDebug(String str) {
        if (Wire.outputDebug) {
            synchronized (Wire.outputStreamLock) {
                try {
                    if (this.outputStream == null) {
                        if (this.localnode == null) {
                            return;
                        }
                        this.outputStream = new PrintStream(new FileOutputStream(new StringBuffer().append("WNH ").append(this.localnode.getId().toString()).append("->").append(getNodeId().toString()).append(".txt").toString()));
                    }
                    this.outputStream.println(str);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void read(SelectionKey selectionKey) {
        wireDebug("DBG:read()");
        if (this.state == -2) {
            ((WirePastryNode) getLocalNode()).getSocketManager().update(this);
        }
        SocketChannelReader socketChannelReader = this.reader;
        SocketChannelWriter socketChannelWriter = this.writer;
        if (socketChannelReader == null || socketChannelWriter == null) {
            return;
        }
        while (true) {
            try {
                Object read = socketChannelReader.read((SocketChannel) selectionKey.channel());
                if (read == null) {
                    return;
                }
                if (read != null) {
                    if (read instanceof SocketCommandMessage) {
                        debug(new StringBuffer().append("Read socket message ").append(read).append(" - passing to node handle.").toString());
                        receiveSocketMessage((SocketCommandMessage) read);
                    } else {
                        if (!(read instanceof SocketTransportMessage)) {
                            throw new IllegalArgumentException(new StringBuffer().append("Message ").append(read).append(" was not correctly wrapped.").toString());
                        }
                        SocketTransportMessage socketTransportMessage = (SocketTransportMessage) read;
                        if (!socketTransportMessage.getDestination().equals((Id) getLocalNode().getNodeId())) {
                            debug(new StringBuffer().append("Read message ").append(read).append(" at ").append(this.nodeId).append(" for wrong nodeId ").append(socketTransportMessage.getDestination()).append(" - killing connection.").toString());
                            throw new IOException("Incoming message was for incorrect node id.");
                        }
                        debug(new StringBuffer().append("Read message ").append(read).append(" - passing to pastry node.").toString());
                        wireDebug(new StringBuffer().append("DBG:").append(System.identityHashCode(socketTransportMessage.getObject())).append(",").append(socketTransportMessage.getObject()).toString());
                        wireDebug(new StringBuffer().append(WireFileProcessor.receivedPrefix).append(socketTransportMessage.getObject()).toString());
                        getLocalNode().receiveMessage((Message) socketTransportMessage.getObject());
                    }
                }
            } catch (DeserializationException e) {
                System.out.println("An error occured during message deserialization - ignoring message...");
                socketChannelReader.reset();
                return;
            } catch (ImproperlyFormattedMessageException e2) {
                System.out.println(new StringBuffer().append("Improperly formatted message found during parsing - ignoring message... ").append(e2).toString());
                socketChannelReader.reset();
                return;
            } catch (IOException e3) {
                debug(new StringBuffer().append("Error occurred during reading from ").append(this.address).append(" at ").append(getNodeId()).append(" - closing socket. ").append(e3).toString());
                close(socketChannelWriter.getQueue());
                return;
            }
        }
    }

    @Override // rice.pastry.dist.DistCoalesedNodeHandle
    public boolean pingImpl() {
        if (this.isLocal) {
            setProximity(0);
            return this.alive;
        }
        if (getLocalNode() != null && ((WireNodeHandle) getLocalNode().getLocalHandle()).getAddress().getAddress().equals(this.address.getAddress())) {
            setProximity(1);
            return this.alive;
        }
        if (getLocalNode() != null) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastpingtime < PING_THROTTLE * 1000) {
                return this.alive;
            }
            this.lastpingtime = currentTimeMillis;
            ((WirePastryNode) getLocalNode()).getDatagramManager().write(this.nodeId, this.address, new PingMessage(getLocalNode().getNodeId(), this.nodeId, 0, this));
        }
        return this.alive;
    }

    public void pingStarted() {
        this.lastpingtime = System.currentTimeMillis();
    }

    public void pingResponse() {
        if (this.isLocal) {
            debug("ERROR (pingResponse): Ping should never be sent to local node...");
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (proximity() > ((int) (currentTimeMillis - this.lastpingtime))) {
            setProximity((int) (currentTimeMillis - this.lastpingtime));
        }
        markAlive();
    }

    @Override // rice.pastry.dist.DistCoalesedNodeHandle
    public String toStringImpl() {
        return new StringBuffer().append("[").append(this.nodeId).append(" (").append(this.address.getAddress().getHostAddress()).append(":").append(this.address.getPort()).append(") on ").append(this.localnode).append("]").toString();
    }

    private int getAddress(InetAddress inetAddress) {
        byte[] address = inetAddress.getAddress();
        return (address[0] << 24) | (address[1] << 16) | (address[2] << 8) | address[3];
    }

    private void setState(int i, String str) {
        if (this.state == -1 && i == -2) {
            Wire.acquireFileDescriptor();
        }
        if (i == -1 && this.state == -2) {
            Wire.releaseFileDescriptor();
        }
        if (i == -3 && this.state == -2) {
            Wire.releaseingFileDescriptor();
        }
        if (i == -1 && this.state == -3) {
            Wire.doneReleaseingFileDescriptor();
        }
        this.state = i;
        String str2 = "unknown";
        switch (i) {
            case STATE_USING_UDP_WAITING_TO_DISCONNECT /* -4 */:
                str2 = "USING_UDP_WAITING_TO_DISCONNECT";
                break;
            case STATE_USING_UDP_WAITING_FOR_TCP_DISCONNECT /* -3 */:
                str2 = "USING_UDP_WAITING_FOR_TCP_DISCONNECT";
                break;
            case -2:
                str2 = "USING_TCP";
                break;
            case -1:
                str2 = "USING_UDP";
                break;
        }
        wireDebug(new StringBuffer().append("DBG:setState(").append(str2).append("):").append(str).toString());
    }

    private int messageSize(Object obj) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
        objectOutputStream.writeObject(new SocketTransportMessage(obj, this.nodeId));
        objectOutputStream.flush();
        return byteArrayOutputStream.toByteArray().length;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeDueToError() {
        if (this.writer != null) {
            close(this.writer.getQueue());
        } else {
            close(null);
        }
    }

    private void close(Iterator it) {
        synchronized (this.localnode) {
            synchronized (this) {
                if (this.state == -1) {
                    return;
                }
                try {
                    debug("Closing and cleaning up socket.");
                    if (this.key != null) {
                        this.key.channel().close();
                        this.key.cancel();
                        this.key.attach(null);
                    }
                    if (this.state == -2) {
                        debug("Disconnect was unexpected - marking node as dead.");
                        ((WirePastryNode) getLocalNode()).getSocketManager().closeSocket(this);
                        markDead();
                    }
                    setState(-1, "close()");
                    if (it != null) {
                        debug(new StringBuffer().append("Messages contains ").append(this.writer.queueSize()).append(" messages.").toString());
                        while (it.hasNext()) {
                            Object next = it.next();
                            if (next instanceof SocketTransportMessage) {
                                SocketTransportMessage socketTransportMessage = (SocketTransportMessage) next;
                                if (socketTransportMessage.getObject() instanceof RouteMessage) {
                                    RouteMessage routeMessage = (RouteMessage) socketTransportMessage.getObject();
                                    routeMessage.nextHop = null;
                                    getLocalNode().receiveMessage(routeMessage);
                                    debug(new StringBuffer().append("Rerouted message ").append(routeMessage).toString());
                                } else {
                                    wireDebug(new StringBuffer().append(WireFileProcessor.dropPrefix).append(socketTransportMessage).toString());
                                    debug(new StringBuffer().append("Dropped message ").append(socketTransportMessage).append(" on floor.").toString());
                                }
                            } else {
                                debug(new StringBuffer().append("Dropped message ").append(next).append(" on floor.").toString());
                                wireDebug(new StringBuffer().append(WireFileProcessor.dropPrefix).append(next).toString());
                            }
                        }
                    }
                    debug("Done rerouting messages...");
                    this.writer = null;
                    this.reader = null;
                } catch (IOException e) {
                    System.out.println(new StringBuffer().append("IOException ").append(e).append(" disconnecting from remote node ").append(this.address).toString());
                    markDead();
                }
            }
        }
    }

    private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
        objectInputStream.defaultReadObject();
        setState(-1, "readObject()");
    }
}
