package rice.pastry.wire;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InvalidClassException;
import java.io.NotSerializableException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import rice.pastry.Id;
import rice.pastry.Log;
import rice.pastry.NodeId;
import rice.pastry.messaging.Message;
import rice.pastry.wire.exception.DeserializationException;
import rice.pastry.wire.exception.ImproperlyFormattedMessageException;
import rice.pastry.wire.exception.SerializationException;
import rice.pastry.wire.messaging.datagram.AcknowledgementMessage;
import rice.pastry.wire.messaging.datagram.DatagramMessage;
import rice.pastry.wire.messaging.datagram.DatagramTransportMessage;
import rice.pastry.wire.messaging.datagram.PingMessage;

/* loaded from: input_file:rice/pastry/wire/DatagramManager.class */
public class DatagramManager implements SelectionKeyHandler {
    private int port;
    private WirePastryNode pastryNode;
    private SelectorManager manager;
    private DatagramChannel channel;
    public static int DATAGRAM_RECEIVE_BUFFER_SIZE = 131072;
    public static int DATAGRAM_SEND_BUFFER_SIZE = 65536;
    private SelectionKey key;
    private DatagramTransmissionManager transmissionManager;
    private LinkedList ackQueue = new LinkedList();
    private HashMap lastAckNum = new HashMap();
    private ByteBuffer buffer = ByteBuffer.allocateDirect(DATAGRAM_SEND_BUFFER_SIZE);

    public DatagramManager(WirePastryNode wirePastryNode, SelectorManager selectorManager, int i) {
        this.port = i;
        this.manager = selectorManager;
        this.pastryNode = wirePastryNode;
        try {
            this.channel = DatagramChannel.open();
            this.channel.configureBlocking(false);
            this.channel.socket().bind(new InetSocketAddress(i));
            this.channel.socket().setSendBufferSize(DATAGRAM_SEND_BUFFER_SIZE);
            this.channel.socket().setReceiveBufferSize(DATAGRAM_RECEIVE_BUFFER_SIZE);
            Selector selector = selectorManager.getSelector();
            synchronized (selector) {
                this.key = this.channel.register(selector, 1);
            }
            this.key.attach(this);
        } catch (IOException e) {
            System.out.println(new StringBuffer().append("PANIC: Error binding datagram server to port ").append(i).append(": ").append(e).toString());
            System.exit(-1);
        }
        this.transmissionManager = new DatagramTransmissionManager(wirePastryNode, this.key);
    }

    public void resetAckNumber(NodeId nodeId) {
        this.transmissionManager.resetAckNumber(nodeId);
        this.lastAckNum.remove(nodeId);
    }

    public void write(NodeId nodeId, InetSocketAddress inetSocketAddress, Object obj) {
        debug(new StringBuffer().append("Enqueueing write to ").append(nodeId).append(" of ").append(obj).toString());
        this.transmissionManager.add(new PendingWrite(nodeId, inetSocketAddress, obj));
        this.manager.getSelector().wakeup();
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void read(SelectionKey selectionKey) {
        while (true) {
            try {
                InetSocketAddress inetSocketAddress = (InetSocketAddress) this.channel.receive(this.buffer);
                if (inetSocketAddress == null) {
                    return;
                }
                debug(new StringBuffer().append("Received data from address ").append(inetSocketAddress).toString());
                this.buffer.flip();
                if (this.buffer.remaining() > 0) {
                    Object deserialize = deserialize(this.buffer);
                    if (deserialize instanceof DatagramMessage) {
                        DatagramMessage datagramMessage = (DatagramMessage) deserialize;
                        if (datagramMessage.getDestination().equals((Id) this.pastryNode.getNodeId())) {
                            if (((WireNodeHandlePool) this.pastryNode.getNodeHandlePool()).get(datagramMessage.getSource()) == null) {
                            }
                            if (deserialize instanceof AcknowledgementMessage) {
                                this.transmissionManager.receivedAck((AcknowledgementMessage) datagramMessage);
                            } else if (sendAck(inetSocketAddress, datagramMessage)) {
                                if (deserialize instanceof DatagramTransportMessage) {
                                    this.pastryNode.receiveMessage((Message) ((DatagramTransportMessage) deserialize).getObject());
                                } else if (!(deserialize instanceof PingMessage)) {
                                    System.out.println(new StringBuffer().append("ERROR: Recieved unreccognized datagrammessage: ").append(deserialize).toString());
                                }
                            }
                        } else {
                            debug(new StringBuffer().append("ERROR: Recieved message ").append(datagramMessage).append(" at ").append(this.pastryNode.getNodeId()).append(" for dest ").append(datagramMessage.getDestination()).append(" - dropping on floor.").toString());
                        }
                    } else {
                        System.out.println(new StringBuffer().append("ERROR: Received unrecognized message ").append(deserialize).append(" at ").append(this.pastryNode.getNodeId()).append(" - dropping on floor.").toString());
                    }
                } else {
                    debug("Read from datagram channel, but no bytes were there - no bad, but wierd.");
                }
            } catch (IOException e) {
                debug(new StringBuffer().append("ERROR (datagrammanager:read): ").append(e).toString());
                e.printStackTrace();
                return;
            }
        }
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void write(SelectionKey selectionKey) {
        try {
            Iterator it = this.ackQueue.iterator();
            while (it.hasNext()) {
                AcknowledgementMessage acknowledgementMessage = (AcknowledgementMessage) it.next();
                if (this.channel.send(serialize(acknowledgementMessage), acknowledgementMessage.getAddress()) > 0) {
                    it.remove();
                } else {
                    System.out.println("ERROR: 0 bytes written of ack (not fatal, but bad)");
                }
            }
            Iterator ready = this.transmissionManager.getReady();
            while (ready.hasNext()) {
                PendingWrite pendingWrite = (PendingWrite) ready.next();
                if (this.channel.send(serialize(pendingWrite.getObject()), pendingWrite.getAddress()) == 0) {
                    System.out.println("ERROR: 0 bytes were written (not fatal, but bad) - full buffer.");
                }
                debug(new StringBuffer().append("Wrote message ").append(pendingWrite.getObject()).append(" to ").append(pendingWrite.getDestination()).toString());
            }
        } catch (IOException e) {
            System.out.println(new StringBuffer().append("ERROR (datagrammanager:write): ").append(e).toString());
        }
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void wakeup() {
        this.transmissionManager.wakeup();
        if (this.ackQueue.size() > 0) {
            this.key.interestOps(this.key.interestOps() | 4);
        }
    }

    private boolean sendAck(InetSocketAddress inetSocketAddress, DatagramMessage datagramMessage) throws IOException {
        if (this.channel.send(serialize(datagramMessage.getAck(inetSocketAddress)), inetSocketAddress) == 0) {
            this.ackQueue.add(datagramMessage.getAck(inetSocketAddress));
        }
        Integer num = (Integer) this.lastAckNum.get(datagramMessage.getSource());
        if (num == null || num.intValue() < datagramMessage.getNum()) {
            this.lastAckNum.put(datagramMessage.getSource(), new Integer(datagramMessage.getNum()));
            return true;
        }
        if (num.intValue() <= datagramMessage.getNum()) {
            return false;
        }
        debug(new StringBuffer().append(this.pastryNode.getNodeId()).append(" (M): ERROR: Got transmission with ack less than the last ack - ignoring message.").append(" This is probably becuase a socket is being opened, but we haven't yet noticed it.").toString());
        return false;
    }

    public static ByteBuffer serialize(Object obj) throws IOException {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            new ObjectOutputStream(byteArrayOutputStream).writeObject(obj);
            int length = byteArrayOutputStream.toByteArray().length;
            return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
        } catch (InvalidClassException e) {
            System.out.println("PANIC: Object to be serialized was an invalid class!");
            throw new SerializationException("Invalid class during attempt to serialize.");
        } catch (NotSerializableException e2) {
            System.out.println(new StringBuffer().append("PANIC: Object to be serialized was not serializable! [").append(obj).append("]").toString());
            throw new SerializationException(new StringBuffer().append("Unserializable class ").append(obj).append(" during attempt to serialize.").toString());
        }
    }

    public static Object deserialize(ByteBuffer byteBuffer) throws IOException {
        byte[] bArr = new byte[byteBuffer.remaining()];
        byteBuffer.get(bArr);
        byteBuffer.clear();
        try {
            return new ObjectInputStream(new ByteArrayInputStream(bArr)).readObject();
        } catch (InvalidClassException e) {
            System.out.println("PANIC: Serialized message was an invalid class!");
            throw new DeserializationException("Invalid class in message - closing channel.");
        } catch (ClassNotFoundException e2) {
            System.out.println("PANIC: Unknown class type in serialized message!");
            throw new ImproperlyFormattedMessageException("Unknown class type in message - closing channel.");
        }
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void accept(SelectionKey selectionKey) {
        System.out.println("PANIC: accept was called on the DatagramManager!");
    }

    @Override // rice.pastry.wire.SelectionKeyHandler
    public void connect(SelectionKey selectionKey) {
        System.out.println("PANIC: connect was called on the DatagramManager!");
    }

    private void debug(String str) {
        if (Log.ifp(8)) {
            System.out.println(new StringBuffer().append(this.pastryNode.getNodeId()).append(" (DM): ").append(str).toString());
        }
    }
}
