The FreePastry Tutorial.
This tutorial is designed to get you cooking quickly with the FreePastry
API and software toolkit.
Version @tutorial_version@; @tutorial_date@. For FreePastry version @freepastry_version@. Maintained by @maintainer@.
Transport Layers
Modify low level details of FreePastry's network stack.
This tutorial will show you how to use the new TransportLayer interface in the package org.mpisws.p2p.transport. It will also show you how to integrate your TransportLayer to the SocketPastryNodeFactory network stack. Warning: This tutorial is more difficult than the previous tutorials and is intended for developers who wish to make low-level changes to the FreePastry network stack. We will start by stating some of the goals of the the Layered Transport Layer. Versions of FreePastry before 2.1 had a unified transport layer. When features at this level needed to be added or modified, it was very complicated to get all of the parts to work properly. The new version rearranges network transport into a stack of layers where each has it's own small task. To give you an idea of what a typical task of a layer, we will describe the layers that are assembled when creating the SocketPastryNodeFactory in FreePastry 2.1, beginning with the lowest layer:
SocketPastryNodeFactory's Layers:
- Wire -- Opens/Accepts Sockets, Sends/Receives Datagrams
- MagicNumber -- Throws away sockets/datagrams for other applications such as HTTP (if it doesn't match the application specific magic number)
- MultiInetAddressTransportLayer -- Handles multi-homing (ex, when using a NAT a node may have more than 1 address:port, the internal address, and the NAT's external address that is forwarded)
- SourceRoute -- sends messages/opens sockets along a source route to route around temporary routing anomalies. This layer manages both the endpoints and the intermediate nodes. Note that this layer does not determine the optimal route to an end host, that is done by another layer, the SourceRouteManager.
- LowerIdentity -- This layer, in conjunction with the UpperIdenity maintains the "intention" of the sent/received message. For example, if a node has restarted with a different NodeId this layer drops pings/sockets if they were intended for the previous node at this address.
- Liveness -- Pings nodes to determine liveness/proximity, implements 2 new interfaces: LivenessProvider, Pinger
- SourceRouteManager -- Chooses the appropriate SourceRoute based on the liveness/proximity exported by the lower layer, implements another new interface ProximityProvider
- Priority -- Uses a single TCP socket to send messages. Can select the order of the messages based on the priority.
- UpperIdentity -- (See Lower Identity) This layer keeps track of the intended destination of the message so that the lower layer can properly encode that intention.
- CommonAPI -- Serializes/Deserializes messages from a RawMessage to a ByteBuffer.
Other interesting layers:
- DirectTransportLayer -- This implements the discreet event simulator, and is used by the DirectPastryNodeFactory.
Upcoming layers:
- SSL -- Provides Crypto/Authentication (typically goes above the SourceRoute layer to provide end-to-end crypto/auth)
- BandwidthLimiting -- Limits the Bandwidth of a node (typically goes between MagicNumber/SourceRoute Layers)
- PeerReview -- Provides protocol accountability (typically goes near the top, such as between CommonAPI/Priority layers)
- STUN -- would likely replace the Wire Layer and provide NAT hole-punching
The TransportLayer interface:
public interface TransportLayer<Identifier, MessageType> extends Destructable {}Each transport layer operates on an Identifier (InetSocketAddress, SourceRoute, NodeHandle etc.), and a MessageType (ByteBuffer, RawMessage etc.) The most common operations are sending messages and opening sockets:
public MessageRequestHandle<Identifier, MessageType> sendMessage( Identifier i, MessageType m, MessageCallback<Identifier, MessageType> deliverAckToMe, Map<String, Object> options); public SocketRequestHandle<Identifier> openSocket( Identifier i, SocketCallback<Identifier> deliverSocketToMe, Map<String, Object> options);For these methods, you need the Identifier of the remote node, and (for
sendMessage()
) the Message to be delivered. Additionally, you may specify some transport-layer specific Options such as Guaranteed/Unguaranteed/Encrypted etc. We will describe some of these options in a later tutorial. Finally you provide a Callback (deliverAckToMe
/deliverSocketToMe
) to deliver notificaiton of success or failure when the operation completes. These calls are non-blocking and return a RequestHandle. The RequestHandle is like a receipt or a tracking number. You can use the RequestHandle to cancel the existing request if it is no longer necessary. For example if the operaiton takes too long. The next tutorial shows you how to work with a RequestHandle.
The TransportLayerCallback interface:
TheTransportLayerCallback
provides the inverse operations (the result of a remote node sending a message or opening a socket) and must have the identical Identifier/MessageType:
public interface TransportLayerCallback<Identifier, MessageType> { public void messageReceived(Identifier i, MessageType m, Map<String, Object> options) throws IOException; public void incomingSocket(P2PSocket<Identifier> s) throws IOException; }The P2PSocket is similar to the AppSocket. It is non-blocking and you must register each time you use it as explained in the AppSocket tutorial. If you have not taken the AppSocket tutorial already, you should probably do that one first. Here are the calls for the P2PSocket:
void register(boolean wantToRead, boolean wantToWrite, P2PSocketReceiverreceiver); long read(ByteBuffer dsts) throws IOException; long write(ByteBuffer srcs) throws IOException; void shutdownOutput(); void close(); Identifier getIdentifier(); Map getOptions();
Other calls in the TransportLayer:
This method returns the Identifier of the local node for this layer.public Identifier getLocalIdentifier();These methods can control flow by rejecting new messags/sockets if the local node is being overwhelmed.
public void acceptSockets(boolean b); public void acceptMessages(boolean b);This method sets the callback
public void setCallback(TransportLayerCallback<Identifier, MessageType> callback);This method sets the
ErrorHandler
which is usd for notification of unexpected behavior. Ex: The acceptor socket closes, or an unexpected message arrives
public void setErrorHandler(ErrorHandler<Identifier> handler);This method cleans up the layer, in case you wish to shut down the transport layer w/o exiting the jvm. For example, it will close the ServerSocket that accepts new sockets, as well as clean up any memory.
public void destroy();
Your first new TransportLayer
Overview:
- In this tutorial, we will create a new tranport layer that caps the peak outgoing bandwidth. We will use a bucket sysetem with configurable bandwidth and bucket time limit. For example if we want to limit bandwidth to 10K/second, we can allow 10K for any second, or 1K for 1/10th of second.
- For simplicity, we won't distinguish between socket and datagram traffic.
- The obvious place for this layer will be just above the Wire layer, but to provide maximum flexibility, we would like this layer to work with any Identifier. Thus we will keep the Identifier parameter generic.
- Because of the nature of the layer, we must to specify a message type that has a size. In this case, we'll use the
ByteBuffer
as our Message type. - To insert our new layer between 2 existing layers, we also need to implement a
TransportLayerCallback
of the same types so we can insert ourself between two existing layers.
Download the tutorial files: BandwidthLimitingTransportLayer.java NotEnoughBandwidthException.java DistTutorial.java MyApp.java, MyMsg.java into a directory called rice/tutorial/transportlayer/.
Here is the definition of our new class.public class BandwidthLimitingTransportLayer<Identifier> implements TransportLayer<Identifier, ByteBuffer>, TransportLayerCallback<Identifier, ByteBuffer> { }Here is the constructor. It takes the
TransportLayer
immeadately below this layer, the bucket size, and bucket time length. We also need access to the environment so we can create a logger.
public BandwidthLimitingTransportLayer( TransportLayer<Identifier, ByteBuffer> tl, long bucketSize, int bucketTimelimit, Environment env) { this.environment = env; this.tl = tl; BUCKET_SIZE = bucketSize; BUCKET_TIME_LIMIT = bucketTimelimit; logger = env.getLogManager().getLogger(BandwidthLimitingTransportLayer.class, null); tl.setCallback(this); }You can look at the code to see the declaration of these fields. The last thing we have to do is set ourself as the lower level's callback. This will cause it to deliver messages/sockets to us. This variable is the bucket.
/** * When this goes to zero, don't send messages */ protected long bucket;Now let's create a task to refil the bucket.
environment.getSelectorManager().getTimer().schedule(new TimerTask(){ @Override public void run() { // always synchronize on "this" before modifying the bucket synchronized(this) { bucket = BUCKET_SIZE; } } }, 0, BUCKET_TIME_LIMIT);If the TimerTask is unfamiliar, please review the timer tutorial. This interface is slightly different in that it calls
run()
, rather than sending a MessageToSelf
, but it is the same idea.
Limit Message Bandwidth
Now let's limit the outgoing message bandwidth. We are going to throw aNotEnoughBandwidthException
if there isn't sufficient bandwidth to send a message.
There are three things we must do here.
- Subtract from the bucket or throw the exception.
- Return a proper Message receipt so the task can be cancelled.
- Acknowledge the message when the lower transport layer acknowledges it to us.
deliverAckToMe.sendFailed()
when there isn't enough bandwidth. Note: The MessageCallback
may be null, so we must check that it isn't before calling sendFailed() otherwise we will get a NullPointerException. Also, so we can detect that this is occuring when we run the code, we will log when the message is dropped.
public MessageRequestHandle<Identifier, ByteBuffer> sendMessage( Identifier i, ByteBuffer m, MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) { boolean success = true; synchronized(this) { if (m.remaining() > bucket) { success = false; } else { bucket-=m.remaining(); } } if (!success) { if (logger.level <= Logger.FINE) logger.log("Dropping message "+m+" because not enough bandwidth:"+bucket); if (deliverAckToMe != null) deliverAckToMe.sendFailed(null, new NotEnoughBandwidthException(bucket, m.remaining())); return null; } tl.sendMessage(i,m,deliverAckToMe,options); }Now we will add code to return a proper
MessageRequestHandle
, as well as supply the MessageRequestHandle when the message succeeds or fails. The release already includes a generic implementation of the MessageRequestHandle: org.mpisws.p2p.transport.util.MessageRequestHandleImpl. Let's take a look at it:
The constructor initializes these 3 fields:
Identifier identifier; MessageType msg; Map<String, Object> options;There are also corresponding getters. However, we still need to be able to
cancel()
the operation in the next transport layer. This requires the 4th field:
Cancellable subCancellable; public boolean cancel() { if (subCancellable == null) return false; return subCancellable.cancel(); }The
subCancellable
is initialized with a call to setSubCancellable()
.
Here is the code that now properly returns a MessageRequestHandle
:
public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(Identifier i, ByteBuffer m, MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) { MessageRequestHandleImpl<Identifier, ByteBuffer> returnMe = new MessageRequestHandleImpl<Identifier, ByteBuffer>(i, m, options); boolean success = true; synchronized(this) { if (m.remaining() > bucket) { success = false; } else { bucket-=m.remaining(); } } if (!success) { if (logger.level <= Logger.FINE) logger.log("Dropping message "+m+" because not enough bandwidth:"+bucket); if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, new NotEnoughBandwidthException(bucket, m.remaining())); return returnMe; } returnMe.setSubCancellable(tl.sendMessage(i,m,deliverAckToMe,options)); return returnMe; }Note how we call
returnMe.setSubCancellable()
with the call to the lower transportLayer.
There is one more problem in the code. Because we simply pass through the deliverAckToMe field, when deliverAckToMe.ack()
or deliverAckToMe.sendFailed()
is called, it will have the wrong MessageRequestHandle. Thus, we need to create our own MessageRequestHandle which wraps deliverAckToMe
.
First, we need to do is declare deliverAckToMe
and returnMe
final.
Second, we will create an anonymous inner class of the MessageCallback
.
public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(Identifier i, ByteBuffer m, final MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) { final MessageRequestHandleImpl<Identifier, ByteBuffer> returnMe = new MessageRequestHandleImpl<Identifier, ByteBuffer>(i, m, options); ... returnMe.setSubCancellable(tl.sendMessage(i,m,new MessageCallback<Identifier, ByteBuffer>() { public void ack(MessageRequestHandle<Identifier, ByteBuffer> msg) { if (deliverAckToMe != null) deliverAckToMe.ack(returnMe); } public void sendFailed(MessageRequestHandle<Identifier, ByteBuffer> msg, IOException reason) { if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, reason); } },options)); return returnMe; }Our
MessageCallback
simply calls deliverAckToMe's ack()/sendFailed()
methods with returnMe.
Here is the full code for the method:
public MessageRequestHandle<Identifier, ByteBuffer> sendMessage(Identifier i, ByteBuffer m, final MessageCallback<Identifier, ByteBuffer> deliverAckToMe, Map<String, Object> options) { final MessageRequestHandleImpl<Identifier, ByteBuffer> returnMe = new MessageRequestHandleImpl<Identifier, ByteBuffer>(i, m, options); boolean success = true; synchronized(this) { if (m.remaining() > bucket) { success = false; } else { bucket-=m.remaining(); } } if (!success) { if (logger.level <= Logger.FINE) logger.log("Dropping message "+m+" because not enough bandwidth:"+bucket); if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, new NotEnoughBandwidthException(bucket, m.remaining())); return returnMe; } returnMe.setSubCancellable(tl.sendMessage(i,m,new MessageCallback<Identifier, ByteBuffer>() { public void ack(MessageRequestHandle<Identifier, ByteBuffer> msg) { if (deliverAckToMe != null) deliverAckToMe.ack(returnMe); } public void sendFailed(MessageRequestHandle<Identifier, ByteBuffer> msg, IOException reason) { if (deliverAckToMe != null) deliverAckToMe.sendFailed(returnMe, reason); } },options)); return returnMe; }This may seem a bit overwhelming right now, but this code provides a lot of flexibility for the transport layer while still keeping the calls simple.
Limit Socket Bandwidth
The last major step is making the Sockets also respect the bandwith limitations. Rather than throwing an exception when we exceed the bandwidth, we just need to throttle the traffic, by only sending what we are allowed. To do this, we will create aP2PSocket
that decrements the bucket on each write. Like the MessageRequestHandleImpl
, we already have an implementation of a P2PSocket
that wraps another. It is called the org.mpisws.p2p.transport.util.SocketWrapperSocket.
The generic parameters of the SocketWrapperSocket
are the 2 kinds of Identifiers that it Adapts. Since we are importing and exporting the same Identifier, we declare our Socket like this:
class BandwidthLimitingSocket extends SocketWrapperSocket<Identifier, Identifier> { }Now we need to set up the constructor. We will provide it the wrapped socket's identifier and options. Also, we must pass it our
logger
.
public BandwidthLimitingSocket(P2PSocket<Identifier> socket) { super(socket.getIdentifier(), socket, BandwidthLimitingTransportLayer.this.logger, socket.getOptions()); }Now we need to override these two methods:
@Override public long write(ByteBuffer srcs) throws IOException {} @Override public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {}Let's start with the first method. Here is the easy case, where we accept the
write()
, or log a message:
if (srcs.remaining() <= bucket) { long ret = super.write(srcs); if (ret >= 0) { // EOF is usually -1 synchronized(this) { bucket -= ret; } } return ret; } if (logger.level <= Logger.FINE) logger.log("Limiting "+socket+" to "+bucket+" bytes.");The rest is complicated, because it is important to leave the incoming ByteBuffer's position in the proper place. We create a variable
ByteBuffer temp
with the amount of space left in the buffer:
// we're trying to write more than we can, we need to create a new ByteBuffer // we have to be careful about the bytebuffer calling into us to properly // set the position when we are done, let's record the original position int originalPosition = srcs.position(); ByteBuffer temp = ByteBuffer.wrap(srcs.array(), originalPosition, bucket);Now we try to write the data by calling super. This method may throw an IOException or return EOF. Fortunately, the
srcs
buffer is in its original position, so we don't need to do anything but return/throw exception:
// try to write our temp buffer long ret = super.write(temp); if (ret < 0) { // there was a problem, // reset the position, return return ret; }If we made it here there were no problems. Allocate the bandwidth and update the position on the
srcs
ByteBuffer:
// allocate the bandwidth synchronized(this) { bucket -= ret; } // the lower layer couldn't write as much as we wanted to // we need to properly set the position srcs.position(originalPosition+(int)ret); return ret;The code for the other
write()
method is similar, but setting the position is more complicated. The first part is similar to before, add up how much to send, and call super if we aren't overflowing.
// calculate how much they are trying to write: int toWrite = 0; for (int i = offset; i < offset+length; i++) { toWrite += srcs[i].remaining(); } int tempBucket = bucket; // so we don't get confused by synchronization if (toWrite <= tempBucket) { long ret = super.write(srcs, offset, length); if (ret >= 0) { // EOF is usually -1 synchronized(this) { bucket -= ret; } } return ret; } if (logger.level <= Logger.FINE) logger.log("Limiting "+socket+" to "+bucket+" bytes.");Note: We use the variable tempBucket so we don't get confused if the bucket is refilled asynchronously. The strategy now is to make a copy of the
srcs
array. This code finds the ByteBuffer that causes the overflow. We will replace that item with a temporary buffer as in the above example.
// we're trying to write more than we can, we need to create a new ByteBuffer // in the overflowing position, and set the length properly // we have to be careful about the bytebuffer calling into us to properly // set the position when we are done ByteBuffer[] temp = new ByteBuffer[srcs.length]; System.arraycopy(srcs, 0, temp, 0, srcs.length); int myLength = length; // we'll pass this to the call that we want int myIndex = 0; toWrite = 0; // reset this one for (int i = offset; i < offset+length; i++) { int next = srcs[i].remaining(); if (next+toWrite > tempBucket) { //we have the problem at this slot // set the myLength myLength = i-offset+1; myIndex = i; // replace it with a temporary byteBuffer srcs[i] = ByteBuffer.wrap(srcs[i].array(), srcs[i].position(), tempBucket-toWrite); break; } toWrite+=next; }This code looks like the previous example, but we may need to advance the position of the buffer we replaced.
// try to write our temp buffer long ret = super.write(temp, offset, myLength); if (ret < 0) { // there was a problem return ret; } // allocate the bandwidth synchronized(this) { bucket -= ret; } // we need to properly set the position on the buffer we replaced // the idea here is that we are advancing the srcs[i].position() with the // amount that was written in temp[i].position() srcs[myIndex].position(srcs[myIndex].position()+temp[myIndex].position()); return ret;We're not done yet. Because we are simply passing through the request to write, this will cause an infinite loop when we run out of bandwidth. It goes something like this.
- Client: socket.register() // request to write (passed through to the lower layer).
- Lower layer: receiver.receiveSelectResult() // granted request to write
- Client: socket.write()
- BandwidthLimitingLayer: limiting bandwidth to 0 bytes, write returns 0
- repeat -- The user code didn't make any progress, so obviously it will register again.
- Intercept the call to
socket.register()
- Have make our refill task notify the sockets so they can register these sockets later.
BandwidthLimitingSocket
to store the requesting receiver:
/** * Store the write requestor. */ P2PSocketReceiver<Identifier> storedReceiver;Here is the code to intercept
socket.register()
in BandwidthLimitingSocket
. If we are out of bandwidth and the user wants to write, then we cache the receiver in the storedReceiver
variable.
@Override public void register(boolean wantToRead, boolean wantToWrite, P2PSocketReceiver<Identifier> receiver) { // this variable is what we will pass to super.register() boolean myWantToWrite = wantToWrite; // if the user wants to write, and the bucket is empty, set our temp variable to false if (wantToWrite == true && bucket == 0) { myWantToWrite = false; storedReceiver = receiver; } // only call super.register() if we have something to do if (wantToRead || myWantToWrite) super.register(wantToRead, myWantToWrite, receiver); }Now, our refill task needs to notify all sockets so they can register to write if they have a
storedReceiver
.
We need to keep track of all of our BandwidthLimitingSocket
s in a Collection called sockets. Whenever we create a BandwidthLimitingSocket
we add it to sockets, and whenever we shutdownOutput()
or close()
the socket, we remove it.
/** * Keep track of all of the BandwidthLimitingSocket */ Collection<BandwidthLimitingSocket> sockets = new ArrayList<BandwidthLimitingSocket>(); class BandwidthLimitingSocket extends SocketWrapperSocket<Identifier, Identifier> { public BandwidthLimitingSocket(P2PSocket<Identifier> socket) { super(socket.getIdentifier(), socket, BandwidthLimitingTransportLayer.this.logger, socket.getOptions()); synchronized(BandwidthLimitingTransportLayer.this) { sockets.add(this); } } public void close() { super.close(); synchronized(BandwidthLimitingTransportLayer.this) { sockets.remove(this); } } public void shutdownOutput() { super.shutdownOutput(); synchronized(BandwidthLimitingTransportLayer.this) { sockets.remove(this); } }We also need a method for the refill task to call.
/** * Register and clear the storedReceiver */ public void notifyBandwidthRefilled() { if (storedReceiver != null) { P2PSocketReceiver<Identifier> temp = storedReceiver; storedReceiver = null; super.register(false, true, temp); } }Now modify the refill task to call
notifyBandwidthRefilled()
. This code was in the constructor for BandwidthLimitingTransportLayer.
for (BandwidthLimitingSocket s : sockets) { s.notifyBandwidthRefilled(); }Phew, that was lot of work. Now that we have our
BandwidthLimitingSocket
, we need to use it.
There are two ways to get a socket.
- When the upper layer opens a socket.
- When the lower layer accepts a socket.
openSocket():
Here is the code foropenSocket()
. We don't get the socket right away. We have to wait until it has completed opening. This is similar to a continuation. The first thing we have to do is create a SocketRequestHandle
for the same reasons as we did in the above code with MessageRequestHandle
.
SocketRequestHandleImpl<Identifier> returnMe = new SocketRequestHandleImpl<Identifier>(i,options); returnMe.setSubCancellable(tl.openSocket(i, ... , options)); return returnMe;Now, we ask the lower transport layer to open the socket, and then we will wrap it with our
BandwidthLimitingSocket
which we will return to deliverSocketToMe. If there is an Exception, we just pass it up to the previous layer. Note that it deliverSocketToMe must be non-null, because it's not useful to request opening a socket if you don't receive a handle to it.
tl.openSocket(i, new SocketCallback<Identifier>(){ public void receiveResult(SocketRequestHandle<Identifier> cancellable, P2PSocket<Identifier> sock) { deliverSocketToMe.receiveResult(returnMe, new BandwidthLimitingSocket(sock)); } public void receiveException(SocketRequestHandle<Identifier> s, IOException ex) { deliverSocketToMe.receiveException(returnMe, ex); } }, options)
incomingSocket():
Here is the code forincomingSocket()
. First, we need a callback to deliver the socket to. This will be set later, but we will implement the setCallback() method in TransportLayer.
TransportLayerCallback<Identifier, ByteBuffer> callback; public void setCallback(TransportLayerCallback<Identifier, ByteBuffer> callback) { this.callback = callback; }Now it is simple to override
incomingSocket()
public void incomingSocket(P2PSocket<Identifier> s) throws IOException { callback.incomingSocket(new BandwidthLimitingSocket(s)); }The last step is to add all of the rest of the methods in TransportLayer, transportLayerCallback. These are just going to forward the calls down or up as appropriate:
public void acceptMessages(boolean b) { tl.acceptMessages(b); } public void acceptSockets(boolean b) { tl.acceptSockets(b); } public Identifier getLocalIdentifier() { return tl.getLocalIdentifier(); } public void setErrorHandler(ErrorHandler<Identifier> handler) { tl.setErrorHandler(handler); } public void destroy() { tl.destroy(); } public void messageReceived(Identifier i, ByteBuffer m, Map<String, Object> options) throws IOException { callback.messageReceived(i, m, options); }
Integration with the SocketPastryNodeFactory
Where should we put this layer? We will show two options. Because we are at the Java level, we already kon we can't fully account for the TCP overhead of the bandwidth (retransmission etc). However to get the maximum effect, we should place it just above Wire. Here, we show how to do this by extending SocketPastryNodeFactory. The SocketPastryNodeFactory in FreePastry version 2.1 is much more extensible than before. There is a get...TransportLayer() call for each layer that it constructs. To insert a different layer, simply override one of these calls and wrap the default layer with the new one. When the SocketPastryNodeFactory tries to construct the lowest layer, it callsgetWireTransportLayer()
. We will first construct the default layer by calling super.getWireTransportLayer(). However, we will return our Bandwidth-Limiting layer that wraps the wire layer.
public static PastryNodeFactory exampleA(int bindport, Environment env, NodeIdFactory nidFactory, final int amt, final int time) throws IOException { PastryNodeFactory factory = new SocketPastryNodeFactory(nidFactory, bindport, env) { @Override protected TransportLayer<InetSocketAddress, ByteBuffer> getWireTransportLayer(InetSocketAddress innermostAddress, TLPastryNode pn) throws IOException { // get the default layer TransportLayer<InetSocketAddress, ByteBuffer> wtl = super.getWireTransportLayer(innermostAddress, pn); // wrap it with our layer return new BandwidthLimitingTransportLayer<InetSocketAddress>(wtl, amt, time, pn.getEnvironment()); } }; return factory; }You can replace the construction of the SocketPastryNodeFactory at the beginning of any of the existing tutorials with this code to add the "bandwidth-limiting" feature. Perhaps we don't want to include some of FreePastry's overhead in our bandwidth lmitation. If we put it above the SourceRouteManager, we don't include bandwidth for liveness checks, nor overhead from constructing source routes. However, the SourceRouteManager also performs additional functions of providing Liveness and Proximity. In
exampleB()
we show how easy it is to replace only the TransportLayer functionality of the SourceRouteManager while still returning the existing TransportLayer for the Liveness and Proximity functionality. The returned object for getSourceRouteManagerLayer()
is a TransLivenessProximity<MultiInetSocketAddress, ByteBuffer>. This is a very simple interface that returns 3 objects:
protected interface TransLivenessProximity {
TransportLayer getTransportLayer();
LivenessProvider getLivenessProvider();
ProximityProvider getProximityProvider();
}
Here is the code for exampleB():
public static PastryNodeFactory exampleB(int bindport, Environment env, NodeIdFactory nidFactory, final int amt, final int time) throws IOException {
PastryNodeFactory factory = new SocketPastryNodeFactory(nidFactory, bindport, env) {
@Override
protected TransLivenessProximity<MultiInetSocketAddress, ByteBuffer> getSourceRouteManagerLayer(
TransportLayer<SourceRoute<MultiInetSocketAddress>, ByteBuffer> ltl,
LivenessProvider<SourceRouteMultiInetSocketAddress>> livenessProvider,
Pinger<SourceRoute<MultiInetSocketAddress>> pinger,
TLPastryNode pn,
MultiInetSocketAddress proxyAddress,
MultiAddressSourceRouteFactory esrFactory) throws IOException {
final TransLivenessProximity<MultiInetSocketAddress, ByteBuffer> srm = super.getSourceRouteManagerLayer(
ltl, livenessProvider, pinger, pn, proxyAddress, esrFactory);
final BandwidthLimitingTransportLayer bll = new BandwidthLimitingTransportLayer<MultiInetSocketAddress>(
srm.getTransportLayer(), amt, time, pn.getEnvironment());
return new TransLivenessProximity<MultiInetSocketAddress, ByteBuffer>(){
public TransportLayer<MultiInetSocketAddress, ByteBuffer> getTransportLayer() {
return bll;
}
public LivenessProvider<MultiInetSocketAddress> getLivenessProvider() {
return srm.getLivenessProvider();
}
public ProximityProvider<MultiInetSocketAddress> getProximityProvider() {
return srm.getProximityProvider();
}
};
}
};
return factory;
}
Running the code
We modified DistTutorial from lesson4 to take some more parameters and call BandwidthLimitingTransportLayer.exampleA()
.
public DistTutorial(int bindport, InetSocketAddress bootaddress, int numNodes, Environment env, int bandwidth) throws Exception {
...
// construct the PastryNodeFactory, this is how we use rice.pastry.socket
PastryNodeFactory factory = BandwidthLimitingTransportLayer.exampleA(bindport, env, nidFactory, bandwidth, 1000);
// PastryNodeFactory factory = BandwidthLimitingTransportLayer.exampleB(bindport, env, nidFactory, bandwidth, 1000);
// PastryNodeFactory factory = new SocketPastryNodeFactory(nidFactory, bindport, env);
...
}
/**
* Usage:
* java [-cp FreePastry-<version>.jar] rice.tutorial.transportlayer.DistTutorial localbindport bootIP bootPort numNodes bandwidth;
* example java rice.tutorial.transportlayer.DistTutorial 9001 pokey.cs.almamater.edu 9001 10 1000
*/
public static void main(String[] args) throws Exception {
...
}
We need to turn on FINE logging in our transport layer, and let's also disable ProximityNeighborSelection:
// Disable PNS for our example
env.getParameters().setBoolean("transport_use_pns", false);
// enable logging on our new layer
env.getParameters().setInt("rice.tutorial.transportlayer.BandwidthLimitingTransportLayer_loglevel", Logger.FINE);
Now lets run the code, but we must provide the bandwidth. Let's try 10K/second:
java -cp .:FreePastry-@freepastry_version@.jar rice.tutorial.transportlayer.DistTutorial 5009 10.9.8.7 5009 10 10000
Finished creating new node TLPastryNode[SNH: <0x9492E7..>/FOO/10.9.8.7:5009]
Finished creating new node TLPastryNode[SNH: <0x44C3E7..>/FOO/10.9.8.7:5010]
Finished creating new node TLPastryNode[SNH: <0xBE1C77..>/FOO/10.9.8.7:5011]
Finished creating new node TLPastryNode[SNH: <0x4C94DA..>/FOO/10.9.8.7:5012]
Finished creating new node TLPastryNode[SNH: <0x3FAE2B..>/FOO/10.9.8.7:5013]
Finished creating new node TLPastryNode[SNH: <0xA5DBDC..>/FOO/10.9.8.7:5014]
Finished creating new node TLPastryNode[SNH: <0x99460E..>/FOO/10.9.8.7:5015]
0xB5DAB7:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222632375:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2771 remote=/10.9.8.7:5014] to 315 bytes.
0xB5DAB7:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222632390:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xB5DAB7:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222632390:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
Finished creating new node TLPastryNode[SNH: <0xB5DAB7..>/FOO/10.9.8.7:5016]
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634218:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:5017 remote=/10.9.8.7:2775] to 283 bytes.
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634234:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634250:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0xC3183D:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222634250:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
Finished creating new node TLPastryNode[SNH: <0xC3183D..>/FOO/10.9.8.7:5017]
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2786 remote=/10.9.8.7:5011] to 410 bytes.
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2789 remote=/10.9.8.7:5015] to 0 bytes.
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Limiting SM java.nio.channels.SocketChannel[connected local=/10.9.8.7:2788 remote=/10.9.8.7:5010] to 0 bytes.
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635843:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
0x317CF2:rice.tutorial.transportlayer.BandwidthLimitingTransportLayer:1190222635859:Dropping message java.nio.HeapByteBuffer[pos=0 lim=92 cap=92] because not enough bandwidth:0
Finished creating new node TLPastryNode[SNH: <0x317CF2..>/FOO/10.9.8.7:5018]
MyApp <0x9492E7..> sending to <0x1CACFE..>
MyApp <0x317CF2..> received MyMsg from <0x9492E7..> to <0x1CACFE..>
...
Note the logging of
Limiting SM XXX to XXX bytes.
and
Dropping message XXX because not enough bandwidth:XXX
However, the code still runs successfully.
Congratulations! You have successfully extended FreePastry's transport layer.