So when we accept a request for a peer, and it completes, we then
allocate it another token?
On Thu, Nov 02, 2006 at 06:50:37PM +0000, mrogers at freenetproject.org wrote:
> Author: mrogers
> Date: 2006-11-02 18:50:28 +0000 (Thu, 02 Nov 2006)
> New Revision: 10797
>
> Added:
> trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java
> Modified:
> trunk/apps/load-balancing-sims/phase7/sim/Node.java
> trunk/apps/load-balancing-sims/phase7/sim/Peer.java
> trunk/apps/load-balancing-sims/phase7/sim/Sim.java
> trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java
> trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
> trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
> trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
> trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
> trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
> Log:
> Token passing (not tested yet)
>
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-02
> 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-02
> 18:50:28 UTC (rev 10797)
> @@ -11,6 +11,10 @@
> // Coarse-grained retransmission timer
> public final static double RETX_TIMER = 0.1; // Seconds
>
> + // Flow control
> + public final static int FLOW_TOKENS = 20; // Shared by all peers
> + public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens
> +
> public double location; // Routing location
> public NetworkInterface net;
> private HashMap<Integer,Peer> peers; // Look up a peer by its address
> @@ -25,6 +29,7 @@
> private boolean decrementMinHtl = false;
> public TokenBucket bandwidth; // Bandwidth limiter
> private boolean timerRunning = false;
> + private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer
>
> public Node (double txSpeed, double rxSpeed)
> {
> @@ -45,7 +50,9 @@
> pubKeyCache = new LruCache<Integer> (16000);
> if (Math.random() < 0.5) decrementMaxHtl = true;
> if (Math.random() < 0.25) decrementMinHtl = true;
> - bandwidth = new TokenBucket (15000, 60000);
> + bandwidth = new TokenBucket (40000, 60000);
> + // Allocate flow control tokens after a short delay
> + Event.schedule (this, Math.random(), ALLOCATE_TOKENS, null);
> }
>
> // Return true if a connection was added, false if already connected
> @@ -173,7 +180,9 @@
> public void handleMessage (Message m, Peer src)
> {
> if (src != null) log ("received " + m + " from " + src);
> - if (m instanceof ChkRequest)
> + if (m instanceof Token)
> + handleToken ((Token) m, src);
> + else if (m instanceof ChkRequest)
> handleChkRequest ((ChkRequest) m, src);
> else if (m instanceof ChkInsert)
> handleChkInsert ((ChkInsert) m, src);
> @@ -188,6 +197,11 @@
> }
> }
>
> + private void handleToken (Token t, Peer prev)
> + {
> + prev.tokensOut += t.id; // t.id is the number of tokens
> + }
> +
> private void handleChkRequest (ChkRequest r, Peer prev)
> {
> if (!recentlySeenRequests.add (r.id)) {
> @@ -198,6 +212,7 @@
> if (mh != null) mh.removeNextHop (prev);
> return;
> }
> + if (!getToken (prev)) return;
> // Accept the search
> if (prev != null) {
> log ("accepting " + r);
> @@ -212,6 +227,7 @@
> for (int i = 0; i < 32; i++)
> prev.sendMessage (new Block (r.id, i));
> }
> + allocateToken (prev);
> return;
> }
> log ("key " + r.key + " not found in CHK store");
> @@ -224,6 +240,7 @@
> for (int i = 0; i < 32; i++)
> prev.sendMessage (new Block (r.id, i));
> }
> + allocateToken (prev);
> return;
> }
> log ("key " + r.key + " not found in CHK cache");
> @@ -243,6 +260,7 @@
> if (mh != null) mh.removeNextHop (prev);
> return;
> }
> + if (!getToken (prev)) return;
> // Accept the search
> if (prev != null) {
> log ("accepting " + i);
> @@ -264,6 +282,7 @@
> if (mh != null) mh.removeNextHop (prev);
> return;
> }
> + if (!getToken (prev)) return;
> // Look up the public key
> boolean pub = pubKeyCache.get (r.key);
> if (pub) log ("public key " + r.key + " found in cache");
> @@ -284,6 +303,7 @@
> prev.sendMessage
> (new SskPubKey (r.id, r.key));
> }
> + allocateToken (prev);
> return;
> }
> log ("key " + r.key + " not found in SSK store");
> @@ -298,6 +318,7 @@
> prev.sendMessage
> (new SskPubKey (r.id, r.key));
> }
> + allocateToken (prev);
> return;
> }
> log ("key " + r.key + " not found in SSK cache");
> @@ -317,6 +338,7 @@
> if (mh != null) mh.removeNextHop (prev);
> return;
> }
> + if (!getToken (prev)) return;
> // Look up the public key
> boolean pub = pubKeyCache.get (i.key);
> if (pub) log ("public key " + i.key + " found in cache");
> @@ -336,9 +358,45 @@
> {
> MessageHandler mh = messageHandlers.remove (id);
> if (mh == null) log ("no message handler to remove for " + id);
> - else log ("removing message handler for " + id);
> + else {
> + log ("removing message handler for " + id);
> + allocateToken (mh.prev);
> + }
> }
>
> + // Check whether the peer sendng a request or insert has enough tokens
> + private boolean getToken (Peer p)
> + {
> + if (p == null) {
> + if (spareTokens == 0) {
> + // The client will have to wait
> + log ("not enough tokens");
> + return false;
> + }
> + spareTokens--;
> + return true;
> + }
> + else {
> + if (p.tokensIn == 0) {
> + // This indicates a misbehaving sender
> + log ("WARNING: not enough tokens");
> + return false;
> + }
> + p.tokensIn--;
> + return true;
> + }
> + }
> +
> + // Give another token to the peer whose request/insert just completed
> + private void allocateToken (Peer p)
> + {
> + if (p == null) spareTokens++;
> + else {
> + p.tokensIn++;
> + p.sendMessage (new Token (1));
> + }
> + }
> +
> // Return the list of peers in a random order
> public ArrayList<Peer> peers()
> {
> @@ -352,16 +410,14 @@
> Event.log (net.address + " " + message);
> }
>
> - // Event callbacks
> -
> - private void generateChkRequest (int key)
> + public void generateChkRequest (int key)
> {
> ChkRequest cr = new ChkRequest (key, location);
> log ("generating " + cr);
> handleChkRequest (cr, null);
> }
>
> - private void generateChkInsert (int key)
> + public void generateChkInsert (int key)
> {
> ChkInsert ci = new ChkInsert (key, location);
> log ("generating " + ci);
> @@ -371,14 +427,14 @@
> handleMessage (new Block (ci.id, i), null);
> }
>
> - private void generateSskRequest (int key)
> + public void generateSskRequest (int key)
> {
> SskRequest sr = new SskRequest (key, location, true);
> log ("generating " + sr);
> handleSskRequest (sr, null);
> }
>
> - private void generateSskInsert (int key, int value)
> + public void generateSskInsert (int key, int value)
> {
> SskInsert si = new SskInsert (key, value, location);
> log ("generating " + si);
> @@ -397,6 +453,18 @@
> else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
> }
>
> + // Allocate all flow control tokens at startup
> + private void allocateTokens()
> + {
> + // Rounding error in your favour - collect 50 tokens
> + int tokensPerPeer = FLOW_TOKENS / (peers.size() + 1);
> + for (Peer p : peers.values()) {
> + p.tokensIn += tokensPerPeer;
> + spareTokens -= tokensPerPeer;
> + p.sendMessage (new Token (tokensPerPeer));
> + }
> + }
> +
> // EventTarget interface
> public void handleEvent (int type, Object data)
> {
> @@ -424,6 +492,10 @@
> case CHECK_TIMEOUTS:
> checkTimeouts();
> break;
> +
> + case ALLOCATE_TOKENS:
> + allocateTokens();
> + break;
> }
> }
>
> @@ -433,4 +505,5 @@
> public final static int INSERT_SSK = 4;
> public final static int SSK_COLLISION = 5;
> private final static int CHECK_TIMEOUTS = 6;
> + private final static int ALLOCATE_TOKENS = 7;
> }
>
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-02
> 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-02
> 18:50:28 UTC (rev 10797)
> @@ -42,6 +42,10 @@
> private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
> private int rxSeq = 0; // Sequence number of next in-order incoming pkt
>
> + // Flow control
> + public int tokensOut = 0; // How many requests/inserts can we send?
> + public int tokensIn = 0; // How many requests/inserts should we accept?
> +
> public Peer (Node node, int address, double location, double latency)
> {
> this.node = node;
> @@ -201,7 +205,7 @@
> sendAck (p.seq);
> }
> // This indicates a misbehaving sender - discard the packet
> - else log ("warning: sequence number out of range");
> + else log ("WARNING: sequence number out of range");
> }
>
> private void handleAck (Ack a)
>
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Sim.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Sim.java 2006-11-02
> 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Sim.java 2006-11-02
> 18:50:28 UTC (rev 10797)
> @@ -5,7 +5,7 @@
> {
> private final int NODES = 100; // Number of nodes
> private final int DEGREE = 5; // Average degree
> - private final double SPEED = 40000; // Network speed, bytes per second
> + private final double SPEED = 15000; // Network speed, bytes per second
> private final double LATENCY = 0.1; // Latency of all links in seconds
> private final int INSERTS = 100; // Number of inserts per publisher
> private Node[] nodes;
>
> Modified:
> trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -36,7 +36,7 @@
> {
> // Insert a random key
> int key = Node.locationToKey (Math.random());
> - Event.schedule (node, 0.0, Node.INSERT_CHK, key);
> + node.generateChkInsert (key);
> // Inform each reader after an average of ten minutes
> for (Node n : readers) {
> double delay = 595.0 + Math.random() * 10.0;
>
> Modified:
> trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -62,7 +62,7 @@
> // Start the search
> forwardSearch();
> // If we have all the blocks and the headers, consider finishing
> - if (blocksReceived == 32 && inState == TRANSFERRING) {
> + if (blocksReceived == 32) {
> inState = COMPLETED;
> considerFinishing();
> }
> @@ -110,6 +110,7 @@
> private void handleRejectedLoop (RejectedLoop rl)
> {
> if (searchState != SENT) node.log (rl + " out of order");
> + next.tokensOut++; // No token was consumed
> forwardSearch();
> }
>
> @@ -158,6 +159,9 @@
> > Node.distance (target, closest))
> htl = node.decrementHtl (htl);
> node.log (this + " has htl " + htl);
> + // Consume a token
> + next.tokensOut--;
> + // Forward the search
> node.log ("forwarding " + this + " to " + next.address);
> next.sendMessage (makeSearchMessage());
> nexts.remove (next);
>
> Modified:
> trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -42,7 +42,7 @@
> searchState = TRANSFERRING;
> if (prev != null) prev.sendMessage (df); // Forward the message
> // If we have all the blocks and the headers, cache the data
> - if (blocksReceived == 32 && searchState == TRANSFERRING) {
> + if (blocksReceived == 32) {
> node.cacheChk (key);
> if (prev == null) node.log (this + " succeeded");
> finish();
>
> Modified:
> trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -21,8 +21,8 @@
> protected double closest; // The closest location seen so far
> protected int htl; // Hops to live for backtracking
>
> - protected Node node; // The owner of this MessageHandler
> - protected Peer prev; // The previous hop of the search
> + public final Node node; // The owner of this MessageHandler
> + public final Peer prev; // The previous hop of the search
> protected Peer next = null; // The (current) next hop of the search
> protected LinkedList<Peer> nexts; // Candidates for the next hop
> protected int searchState = STARTED; // The state of the search
> @@ -60,13 +60,17 @@
> double closestDist = Double.POSITIVE_INFINITY;
> Peer closestPeer = null;
> for (Peer peer : nexts) {
> + if (peer.tokensOut == 0) {
> + node.log ("bypassing busy peer " + peer);
> + continue;
> + }
> double dist = Node.distance (keyLoc, peer.location);
> if (dist < closestDist) {
> closestDist = dist;
> closestPeer = peer;
> }
> }
> - return closestPeer; // Null if the list was empty
> + return closestPeer; // Null if there are no suitable peers
> }
>
> public abstract void handleMessage (Message m, Peer src);
>
> Modified:
> trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -28,6 +28,7 @@
> protected void handleRejectedLoop (RejectedLoop rl)
> {
> if (searchState != SENT) node.log (rl + " out of order");
> + next.tokensOut++; // No token was consumed
> forwardSearch();
> }
>
> @@ -73,6 +74,9 @@
> > Node.distance (target, closest))
> htl = node.decrementHtl (htl);
> node.log (this + " has htl " + htl);
> + // Consume a token
> + next.tokensOut--;
> + // Forward the search
> node.log ("forwarding " + this + " to " + next.address);
> next.sendMessage (makeSearchMessage());
> nexts.remove (next);
>
> Modified:
> trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -89,6 +89,7 @@
> private void handleRejectedLoop (RejectedLoop rl)
> {
> if (searchState != SENT) node.log (rl + " out of order");
> + next.tokensOut++; // No token was consumed
> forwardSearch();
> }
>
> @@ -142,6 +143,9 @@
> > Node.distance (target, closest))
> htl = node.decrementHtl (htl);
> node.log (this + " has htl " + htl);
> + // Consume a token
> + next.tokensOut--;
> + // Forward the search
> node.log ("forwarding " + this + " to " + next.address);
> next.sendMessage (makeSearchMessage());
> nexts.remove (next);
>
> Added: trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -0,0 +1,14 @@
> +package sim.messages;
> +
> +public class Token extends Message
> +{
> + public Token (int tokens)
> + {
> + id = tokens; // Space-saving hack
> + }
> +
> + public String toString()
> + {
> + return new String (id + " tokens");
> + }
> +}
>
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 189 bytes
Desc: Digital signature
URL:
<https://emu.freenetproject.org/pipermail/devl/attachments/20061102/20284431/attachment.pgp>