package rice.pastry.wire;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import rice.pastry.NodeId;
import rice.pastry.PastryNode;
import rice.pastry.dist.DistNodeHandle;
import rice.pastry.messaging.Message;
import rice.pastry.routing.RouteMessage;
import rice.pastry.wire.messaging.datagram.PingMessage;
import rice.pastry.wire.messaging.socket.DisconnectMessage;
import rice.pastry.wire.messaging.socket.HelloMessage;
import rice.pastry.wire.messaging.socket.HelloResponseMessage;
import rice.pastry.wire.messaging.socket.SocketCommandMessage;
import rice.pastry.wire.messaging.socket.SocketTransportMessage;

/* loaded from: input_file:rice/pastry/wire/WireNodeHandle.class */
public class WireNodeHandle extends DistNodeHandle implements SelectionKeyHandler {
    public static int STATE_USING_UDP = -1;
    public static int STATE_USING_TCP = -2;
    public static int STATE_USING_UDP_WAITING_FOR_TCP_DISCONNECT = -3;
    public static int STATE_USING_UDP_WAITING_TO_DISCONNECT = -4;
    public static int MAX_UDP_MESSAGE_SIZE = DatagramManager.DATAGRAM_SEND_BUFFER_SIZE;
    public static int SOCKET_BUFFER_SIZE = 32768;
    public static int PING_THROTTLE = 5;
    private InetSocketAddress address;
    private transient long lastpingtime;
    private transient SocketChannelReader reader;
    private transient SocketChannelWriter writer;
    private transient SelectionKey key;
    private transient int state;

    public WireNodeHandle(InetSocketAddress inetSocketAddress, NodeId nodeId) {
        super(nodeId);
        debug(new StringBuffer().append("creating Socket handle for node: ").append(nodeId).append(" address: ").append(inetSocketAddress).toString());
        this.address = inetSocketAddress;
        this.lastpingtime = 0L;
        this.state = STATE_USING_UDP;
    }

    public WireNodeHandle(InetSocketAddress inetSocketAddress, NodeId nodeId, PastryNode pastryNode) {
        super(nodeId);
        debug(new StringBuffer().append("creating Socket handle for node: ").append(nodeId).append(", local: ").append(pastryNode).append(" address: ").append(inetSocketAddress).toString());
        this.address = inetSocketAddress;
        this.lastpingtime = 0L;
        this.state = STATE_USING_UDP;
        setLocalNode(pastryNode);
    }

    public InetSocketAddress getAddress() {
        return this.address;
    }

    public int getState() {
        return this.state;
    }

    public void receiveSocketMessage(SocketCommandMessage socketCommandMessage) {
        if (!(socketCommandMessage instanceof DisconnectMessage)) {
            if (!(socketCommandMessage instanceof HelloResponseMessage)) {
                System.out.println(new StringBuffer().append("Received unreconginzed SocketCommandMessage ").append(socketCommandMessage).append(" - dropping on floor").toString());
                return;
            } else {
                markAlive();
                this.writer.greetingReceived();
                return;
            }
        }
        debug(new StringBuffer().append("Received DisconnectMessage (state == ").append(this.state).append(")").toString());
        if (this.state == STATE_USING_TCP) {
            this.state = STATE_USING_UDP_WAITING_TO_DISCONNECT;
            ((WirePastryNode) getLocalNode()).getSocketManager().closeSocket(this);
            this.key.interestOps(this.key.interestOps() | 4);
        } else if (this.state == STATE_USING_UDP_WAITING_FOR_TCP_DISCONNECT) {
            close(null);
        } else {
            System.out.println(new StringBuffer().append("Recieved DisconnectMessage at non-connected socket - not fatal... (state == ").append(this.state).append(")").toString());
        }
    }

    @Override // rice.pastry.dist.DistNodeHandle
    public void receiveMessageImpl(Message message) {
        assertLocalNode();
        WirePastryNode wirePastryNode = (WirePastryNode) getLocalNode();
        if (this.isLocal) {
            debug(new StringBuffer().append("Sending message ").append(message).append(" locally").toString());
            wirePastryNode.receiveMessage(message);
            return;
        }
        debug(new StringBuffer().append("Passing message ").append(message).append(" to the socket controller for writing (state == ").append(this.state).append(")").toString());
        if (this.state == STATE_USING_TCP) {
            this.writer.enqueue(new SocketTransportMessage(message));
            if (((WirePastryNode) getLocalNode()).inThread()) {
                this.key.interestOps(this.key.interestOps() | 4);
            } else {
                ((WirePastryNode) getLocalNode()).getSelectorManager().getSelector().wakeup();
            }
            debug(new StringBuffer().append("Enqueued message ").append(message).append(" for writing in socket writer.").toString());
            return;
        }
        try {
            if (this.state != STATE_USING_UDP) {
                ((WirePastryNode) getLocalNode()).getDatagramManager().write(this.nodeId, this.address, message);
            } else if (messageSize(message) <= MAX_UDP_MESSAGE_SIZE) {
                debug("Message is small enough to go over UDP - sending.");
                ((WirePastryNode) getLocalNode()).getDatagramManager().write(this.nodeId, this.address, message);
            } else {
                debug("Message is too large - open up socket!");
                LinkedList linkedList = new LinkedList();
                linkedList.addFirst(new SocketTransportMessage(message));
                connectToRemoteNode(linkedList);
            }
        } catch (IOException e) {
            System.out.println(new StringBuffer().append("IOException serializing message ").append(message).append(" - cancelling message.").toString());
        }
    }

    private int messageSize(Object obj) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
        objectOutputStream.writeObject(new SocketTransportMessage(obj));
        objectOutputStream.flush();
        return byteArrayOutputStream.toByteArray().length;
    }

    public void connectToRemoteNode(LinkedList linkedList) {
        if (this.state == STATE_USING_UDP) {
            try {
                SocketChannel open = SocketChannel.open();
                open.socket().setSendBufferSize(SOCKET_BUFFER_SIZE);
                open.socket().setReceiveBufferSize(SOCKET_BUFFER_SIZE);
                open.configureBlocking(false);
                boolean connect = open.connect(this.address);
                debug(new StringBuffer().append("Opening socket to ").append(this.address).toString());
                Selector selector = ((WirePastryNode) getLocalNode()).getSelectorManager().getSelector();
                synchronized (selector) {
                    if (connect) {
                        this.key = open.register(selector, 1);
                    } else {
                        this.key = open.register(selector, 9);
                    }
                }
                setKey(this.key, new HelloMessage((WirePastryNode) getLocalNode(), this.nodeId));
                if (linkedList != null) {
                    Iterator it = linkedList.iterator();
                    while (it.hasNext()) {
                        Object next = it.next();
                        debug(new StringBuffer().append("Enqueueing message ").append(next).append(" into socket channel writer.").toString());
                        this.writer.enqueue(next);
                    }
                }
                if (((WirePastryNode) getLocalNode()).inThread()) {
                    this.key.interestOps(this.key.interestOps() | 4);
                } else {
                    ((WirePastryNode) getLocalNode()).getSelectorManager().getSelector().wakeup();
                }
            } catch (IOException e) {
                debug(new StringBuffer().append("IOException connecting to remote node ").append(this.address).toString());
                this.state = STATE_USING_TCP;
                close(linkedList);
            }
        }
    }

    public void setKey(SelectionKey selectionKey, SocketCommandMessage socketCommandMessage) {
        debug(new StringBuffer().append("Got new key  (state == ").append(this.state).append(")").toString());
        if (this.state == STATE_USING_UDP) {
            this.key = selectionKey;
            selectionKey.attach(this);
            ((WirePastryNode) getLocalNode()).getSocketManager().openSocket(this);
            ((WirePastryNode) getLocalNode()).getDatagramManager().resetAckNumber(this.nodeId);
            this.reader = new SocketChannelReader((WirePastryNode) getLocalNode());
            this.writer = new SocketChannelWriter((WirePastryNode) getLocalNode(), socketCommandMessage);
            this.state = STATE_USING_TCP;
            return;
        }
        markAlive();
        InetSocketAddress address = ((WireNodeHandle) getLocalNode().getLocalHandle()).getAddress();
        InetSocketAddress address2 = getAddress();
        debug(new StringBuffer().append("Found double socket... (state == ").append(this.state).append(")").toString());
        if (this.state != STATE_USING_TCP) {
            ((WirePastryNode) getLocalNode()).getSocketManager().openSocket(this);
        }
        if (getAddress(address.getAddress()) <= getAddress(address2.getAddress()) && (getAddress(address.getAddress()) != getAddress(address2.getAddress()) || address.getPort() <= address2.getPort())) {
            selectionKey.attach(null);
            debug("Using our socket, letting other socket die...");
            return;
        }
        try {
            this.key.attach(null);
            this.key.channel().close();
            this.key.cancel();
            this.writer.reset(socketCommandMessage);
        } catch (IOException e) {
            System.out.println(new StringBuffer().append("ERROR closing unnecessary socket: ").append(e).toString());
        }
        this.key = selectionKey;
        this.state = STATE_USING_TCP;
        selectionKey.attach(this);
        debug("Killing our socket, using new one...");
    }

    private int getAddress(InetAddress inetAddress) {
        byte[] address = inetAddress.getAddress();
        return (address[0] << 24) | (address[1] << 16) | (address[2] << 8) | address[3];
    }

    public void sendGreetingResponse(HelloMessage helloMessage) {
    }

    public void disconnect() {
        debug(new StringBuffer().append("Received disconnect request... (state == ").append(this.state).append(")").toString());
        if (this.state != STATE_USING_TCP) {
            System.out.println(new StringBuffer().append("Recieved disconnect request at non-connected socket - very bad... (state == ").append(this.state).append(")").toString());
            return;
        }
        this.state = STATE_USING_UDP_WAITING_FOR_TCP_DISCONNECT;
        this.writer.enqueue(new DisconnectMessage());
        if (((WirePastryNode) getLocalNode()).inThread()) {
            this.key.interestOps(this.key.interestOps() | 4);
        } else {
            ((WirePastryNode) getLocalNode()).getSelectorManager().getSelector().wakeup();
        }
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void accept(SelectionKey selectionKey) {
        System.out.println("PANIC: accept() called on WireNodeHandle!");
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void connect(SelectionKey selectionKey) {
        try {
            if (((SocketChannel) selectionKey.channel()).finishConnect()) {
                selectionKey.interestOps(selectionKey.interestOps() & (-9));
            }
            debug(new StringBuffer().append("Found connectable channel - completed connection to ").append(this.address).toString());
        } catch (ConnectException e) {
            debug(new StringBuffer().append("ERROR connecting - cancelling. ").append(e).toString());
            close(this.writer.getQueue());
        } catch (SocketException e2) {
            debug(new StringBuffer().append("ERROR connecting - cancelling. ").append(e2).toString());
            close(this.writer.getQueue());
        } catch (IOException e3) {
            debug(new StringBuffer().append("ERROR connecting - cancelling. ").append(e3).toString());
            close(this.writer.getQueue());
        }
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void write(SelectionKey selectionKey) {
        if (this.state == STATE_USING_TCP) {
            ((WirePastryNode) getLocalNode()).getSocketManager().update(this);
        }
        try {
            if (this.writer.write((SocketChannel) selectionKey.channel())) {
                selectionKey.interestOps(selectionKey.interestOps() & (-5));
                if (this.state == STATE_USING_UDP_WAITING_TO_DISCONNECT) {
                    close(null);
                }
            }
        } catch (IOException e) {
            debug(new StringBuffer().append("ERROR writing - cancelling. ").append(e).toString());
            close(this.writer.getQueue());
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:21:0x00a7, code lost:
    
        throw new java.lang.IllegalArgumentException(new java.lang.StringBuffer().append("Message ").append(r0).append(" was not correctly wrapped.").toString());
     */
    @Override // rice.pastry.wire.SelectionKeyHandler
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void read(java.nio.channels.SelectionKey r6) {
        /*
            Method dump skipped, instructions count: 295
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: rice.pastry.wire.WireNodeHandle.read(java.nio.channels.SelectionKey):void");
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void wakeup() {
        if (this.writer != null) {
            if (!this.key.isValid()) {
                System.out.println(new StringBuffer().append("ERROR: Recieved wakeup with non-valid key! (state == ").append(this.state).append(")").toString());
            } else if (this.writer.isEmpty()) {
                this.key.interestOps(this.key.interestOps() & (-5));
            } else if ((this.key.interestOps() & 4) == 0) {
                this.key.interestOps(this.key.interestOps() | 4);
            }
        }
    }

    private void close(LinkedList linkedList) {
        try {
            debug("Closing and cleaning up socket.");
            if (this.key != null) {
                this.key.channel().close();
                this.key.cancel();
                this.key.attach(null);
            }
            if (this.state == STATE_USING_TCP) {
                debug("Disconnect was unexpected - marking node as dead.");
                ((WirePastryNode) getLocalNode()).getSocketManager().closeSocket(this);
                markDead();
            }
            this.state = STATE_USING_UDP;
            if (linkedList != null) {
                debug(new StringBuffer().append("Messages contains ").append(linkedList.size()).append(" messages.").toString());
                Iterator it = linkedList.iterator();
                while (it.hasNext()) {
                    Object next = it.next();
                    if (next instanceof SocketTransportMessage) {
                        SocketTransportMessage socketTransportMessage = (SocketTransportMessage) next;
                        if (socketTransportMessage.getObject() instanceof RouteMessage) {
                            RouteMessage routeMessage = (RouteMessage) socketTransportMessage.getObject();
                            routeMessage.nextHop = null;
                            getLocalNode().receiveMessage(routeMessage);
                            debug(new StringBuffer().append("Rerouted message ").append(routeMessage).toString());
                        } else {
                            debug(new StringBuffer().append("Dropped message ").append(socketTransportMessage).append(" on floor.").toString());
                        }
                    } else {
                        debug(new StringBuffer().append("Dropped message ").append(next).append(" on floor.").toString());
                    }
                }
            }
            debug("Done rerouting messages...");
            this.writer = null;
            this.reader = null;
        } catch (IOException e) {
            System.out.println(new StringBuffer().append("IOException ").append(e).append(" disconnecting from remote node ").append(this.address).toString());
            markDead();
        }
    }

    @Override // rice.pastry.dist.DistNodeHandle
    public boolean pingImpl() {
        if (this.isLocal) {
            setProximity(0);
            return this.alive;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.lastpingtime < PING_THROTTLE * 1000) {
            return this.alive;
        }
        this.lastpingtime = currentTimeMillis;
        if (getLocalNode() != null) {
            ((WirePastryNode) getLocalNode()).getDatagramManager().write(this.nodeId, this.address, new PingMessage(getLocalNode().getNodeId(), this.nodeId, 0, this));
        }
        return this.alive;
    }

    public void pingStarted() {
        this.lastpingtime = System.currentTimeMillis();
    }

    public void pingResponse() {
        if (this.isLocal) {
            debug("ERROR (pingResponse): Ping should never be sent to local node...");
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (proximity() > ((int) (currentTimeMillis - this.lastpingtime))) {
            setProximity((int) (currentTimeMillis - this.lastpingtime));
        }
        markAlive();
    }

    private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
        objectInputStream.defaultReadObject();
        this.state = STATE_USING_UDP;
    }

    @Override // rice.pastry.dist.DistNodeHandle
    public String toStringImpl() {
        return new StringBuffer().append(this.isLocal ? "(local " : "").append("handle ").append(this.nodeId).append(this.alive ? "" : ":dead").append(", localnode = ").append(getLocalNode()).append(" ").append(this.address).append(")").toString();
    }
}
