+ // If there are no urgent acks, and no urgent messages
or no
+ // room to send them, and not enough messages for a
large
+ // packet or no room to send a large packet, give up!
+ if (ackQueue.deadline() > now
+ && (searchQueue.deadline() > now
+ || searchQueue.headSize() > available)
So we hold search messages forever if there is no transfer traffic?
On Wed, Nov 01, 2006 at 05:11:30PM +0000, mrogers at freenetproject.org wrote:
> Author: mrogers
> Date: 2006-11-01 17:11:27 +0000 (Wed, 01 Nov 2006)
> New Revision: 10775
>
> Modified:
> trunk/apps/load-balancing-sims/phase7/sim/Event.java
> trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
> trunk/apps/load-balancing-sims/phase7/sim/Node.java
> trunk/apps/load-balancing-sims/phase7/sim/Peer.java
> trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
> trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
> trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
> Log:
> Refactored interleaving, coalescing, congestion control and bandwidth limiter
>
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Event.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Event.java 2006-11-01
> 10:00:25 UTC (rev 10774)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Event.java 2006-11-01
> 17:11:27 UTC (rev 10775)
> @@ -7,6 +7,7 @@
>
> private static TreeSet<Event> queue = new TreeSet<Event>();
> private static double clockTime = 0.0;
> + private static double lastLogTime = Double.POSITIVE_INFINITY;
> private static int nextId = 0;
> public static double duration = Double.POSITIVE_INFINITY;
>
> @@ -14,12 +15,13 @@
> {
> queue.clear();
> clockTime = 0.0;
> + lastLogTime = Double.POSITIVE_INFINITY;
> nextId = 0;
> duration = Double.POSITIVE_INFINITY;
> }
>
> public static void schedule (EventTarget target, double time,
> - int type, Object data)
> + int type, Object data)
> {
> queue.add (new Event (target, time + clockTime, type, data));
> }
> @@ -50,6 +52,9 @@
>
> public static void log (String message)
> {
> + // Print a blank line between events
> + if (clockTime > lastLogTime) System.out.println();
> + lastLogTime = clockTime;
> System.out.print (clockTime + " " + message + "\n");
> }
>
>
> Modified: trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
> 2006-11-01 10:00:25 UTC (rev 10774)
> +++ trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
> 2006-11-01 17:11:27 UTC (rev 10775)
> @@ -122,7 +122,6 @@
> }
> }
>
> - // Each EventTarget class has its own event codes
> public final static int RX_QUEUE = 1;
> private final static int RX_END = 2;
> private final static int TX_END = 3;
>
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-01
> 10:00:25 UTC (rev 10774)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-01
> 17:11:27 UTC (rev 10775)
> @@ -8,6 +8,9 @@
>
> public class Node implements EventTarget
> {
> + // Coarse-grained retransmission timer
> + public final static double RETX_TIMER = 0.1; // Seconds
> +
> public double location; // Routing location
> public NetworkInterface net;
> private HashMap<Integer,Peer> peers; // Look up a peer by its address
> @@ -21,7 +24,7 @@
> private boolean decrementMaxHtl = false;
> private boolean decrementMinHtl = false;
> public TokenBucket bandwidth; // Bandwidth limiter
> - private boolean timerRunning = false; // Is the retx timer running?
> + private boolean timerRunning = false;
>
> public Node (double txSpeed, double rxSpeed)
> {
> @@ -149,13 +152,13 @@
> pubKeyCache.put (key);
> }
>
> - // Called by Peer
> + // Called by Peer after transmitting a packet
> public void startTimer()
> {
> if (timerRunning) return;
> - // log ("starting retransmission/coalescing timer");
> - Event.schedule (this, Peer.MAX_DELAY, CHECK_TIMEOUTS, null);
> timerRunning = true;
> + log ("starting retransmission timer");
> + Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
> }
>
> // Called by NetworkInterface
> @@ -385,20 +388,13 @@
>
> private void checkTimeouts()
> {
> - // Check the peers in a random order each time
> - double deadline = Double.POSITIVE_INFINITY;
> - for (Peer p : peers())
> - deadline = Math.min (deadline, p.checkTimeouts());
> - if (deadline == Double.POSITIVE_INFINITY) {
> - // log ("stopping retransmission/coalescing timer");
> + boolean stopTimer = true;
> + for (Peer p : peers()) if (p.checkTimeouts()) stopTimer = false;
> + if (stopTimer) {
> + log ("stopping retransmission timer");
> timerRunning = false;
> }
> - else {
> - double sleep = deadline - Event.time();
> - if (sleep < Peer.MIN_SLEEP) sleep = Peer.MIN_SLEEP;
> - // log ("sleeping for " + sleep + " seconds");
> - Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
> - }
> + else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
> }
>
> // EventTarget interface
> @@ -423,6 +419,7 @@
>
> case SSK_COLLISION:
> generateSskInsert ((Integer) data, 1);
> + break;
>
> case CHECK_TIMEOUTS:
> checkTimeouts();
> @@ -430,7 +427,6 @@
> }
> }
>
> - // Each EventTarget class has its own event codes
> public final static int REQUEST_CHK = 1;
> public final static int INSERT_CHK = 2;
> public final static int REQUEST_SSK = 3;
>
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01
> 10:00:25 UTC (rev 10774)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01
> 17:11:27 UTC (rev 10775)
> @@ -4,7 +4,7 @@
> import java.util.Iterator;
> import java.util.HashSet;
>
> -public class Peer
> +public class Peer implements EventTarget
> {
> private Node node; // The local node
> public int address; // The remote node's address
> @@ -18,10 +18,10 @@
> public final static double LINK_IDLE = 8.0; // RTTs without transmitting
>
> // Coalescing
> - public final static double MIN_SLEEP = 0.01; // Poll the b/w limiter
> - public final static double MAX_DELAY = 0.1; // Coalescing delay in secs
> + private final static double MAX_DELAY = 0.1; // Max coalescing delay
> + private final static double MIN_SLEEP = 0.01; // Poll the b/w limiter
>
> - // Out-of-order delivery with eventual detection of missing packets
> + // Out-of-order delivery with duplicate detection
> public final static int SEQ_RANGE = 1000;
>
> // Sender state
> @@ -33,8 +33,9 @@
> private DeadlineQueue<Message> searchQueue; // Outgoing search messages
> private DeadlineQueue<Message> transferQueue; // Outgoing transfers
> private CongestionWindow window; // AIMD congestion window
> - private double lastTransmission = 0.0; // Clock time
> - private boolean tgif = false; // "Transfers go in first" toggle
> + private double lastTransmission = Double.POSITIVE_INFINITY; // Time
> + private int searchBytesSent = 0, transferBytesSent = 0;
> + private boolean timerRunning = false; // Coalescing timer
>
> // Receiver state
> private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
> @@ -58,8 +59,7 @@
> public void sendMessage (Message m)
> {
> m.deadline = Event.time() + MAX_DELAY;
> - if (m instanceof Block || m instanceof DataInsert
> - || m instanceof ChkDataFound) {
> + if (m instanceof Block) {
> log (m + " added to transfer queue");
> transferQueue.add (m);
> }
> @@ -67,8 +67,8 @@
> log (m + " added to search queue");
> searchQueue.add (m);
> }
> - // Start the node's timer if necessary
> - node.startTimer();
> + // Start the coalescing timer
> + startTimer();
> // Send as many packets as possible
> while (send());
> }
> @@ -78,72 +78,115 @@
> {
> log ("ack " + seq + " added to ack queue");
> ackQueue.add (new Ack (seq, Event.time() + MAX_DELAY));
> - // Start the node's timer if necessary
> - node.startTimer();
> + // Start the coalescing timer
> + startTimer();
> // Send as many packets as possible
> while (send());
> }
>
> + // Start the coalescing timer
> + private void startTimer()
> + {
> + if (timerRunning) return;
> + timerRunning = true;
> + log ("starting coalescing timer");
> + Event.schedule (this, MAX_DELAY, CHECK_DEADLINES, null);
> + }
> +
> // Try to send a packet, return true if a packet was sent
> private boolean send()
> - {
> - if (ackQueue.size + searchQueue.size + transferQueue.size == 0){
> - log ("nothing to send");
> - return false;
> - }
> - log (ackQueue.size + " bytes of acks in queue");
> - log (searchQueue.size + " bytes of searches in queue");
> - log (transferQueue.size + " bytes of transfers in queue");
> -
> + {
> + int waiting = ackQueue.size+searchQueue.size+transferQueue.size;
> + log (waiting + " bytes waiting");
> + if (waiting == 0) return false;
> // Return to slow start when the link is idle
> double now = Event.time();
> if (now - lastTransmission > LINK_IDLE * rtt) window.reset();
> lastTransmission = now;
> -
> - // Delay small packets for coalescing
> - if (now < deadline (now)) {
> - int payload = searchQueue.size + transferQueue.size;
> - log ("delaying transmission of " + payload + " bytes");
> + // How many bytes of messages can we send?
> + int available = Math.min (window.available(),
> + node.bandwidth.available());
> + log (available + " bytes available for packet");
> + // If there are no urgent acks, and no urgent messages or no
> + // room to send them, and not enough messages for a large
> + // packet or no room to send a large packet, give up!
> + if (ackQueue.deadline() > now
> + && (searchQueue.deadline() > now
> + || searchQueue.headSize() > available)
> + && (transferQueue.deadline() > now
> + || transferQueue.headSize() > available)
> + && (waiting < Packet.SENSIBLE_PAYLOAD
> + || available < Packet.SENSIBLE_PAYLOAD)) {
> + log ("not sending a packet");
> return false;
> }
> -
> + // Construct a packet
> Packet p = new Packet();
> -
> - // Put all waiting acks in the packet
> while (ackQueue.size > 0) p.addAck (ackQueue.pop());
> -
> - // Don't send sequence number n+SEQ_RANGE until sequence
> - // number n has been acked - this limits the number of
> - // sequence numbers the receiver must store for replay
> - // detection. We must still be allowed to send acks,
> - // otherwise the connection could deadlock.
> -
> - if (txSeq > txMaxSeq)
> - log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
> - else if (window.available() <= 0)
> - log ("no room in congestion window for messages");
> - else if (node.bandwidth.available() <= 0)
> - log ("no bandwidth available for messages");
> - else pack (p); // OK to send data
> -
> + int space = Math.min (available, Packet.MAX_SIZE - p.size);
> + addPayload (p, space);
> // Don't send empty packets
> if (p.acks == null && p.messages == null) return false;
> -
> + // Transmit the packet
> + log ("sending packet " + p.seq + ", " + p.size + " bytes");
> + node.net.send (p, address, latency);
> + node.bandwidth.remove (p.size);
> // If the packet contains data, buffer it for retransmission
> if (p.messages != null) {
> - p.seq = txSeq++;
> p.sent = now;
> txBuffer.add (p);
> + node.startTimer(); // Start the retransmission timer
> window.bytesSent (p.size);
> }
> -
> - // Send the packet
> - log ("sending packet " + p.seq + ", " + p.size + " bytes");
> - node.net.send (p, address, latency);
> - node.bandwidth.remove (p.size);
> return true;
> }
>
> + // Allocate a payload number and add messages to a packet
> + private void addPayload (Packet p, int space)
> + {
> + log (space + " bytes available for messages");
> + if (txSeq > txMaxSeq) {
> + log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
> + return;
> + }
> + p.seq = txSeq++;
> + // Searches get priority unless transfers are starving
> + if (searchBytesSent < transferBytesSent) {
> + while (searchQueue.size > 0
> + && searchQueue.headSize() <= space) {
> + Message m = searchQueue.pop();
> + searchBytesSent += m.size();
> + space -= m.size();
> + p.addMessage (m);
> + }
> + while (transferQueue.size > 0
> + && transferQueue.headSize() <= space) {
> + Message m = transferQueue.pop();
> + transferBytesSent += m.size();
> + space -= m.size();
> + p.addMessage (m);
> + }
> + }
> + else {
> + while (transferQueue.size > 0
> + && transferQueue.headSize() <= space) {
> + Message m = transferQueue.pop();
> + transferBytesSent += m.size();
> + space -= m.size();
> + p.addMessage (m);
> + }
> + while (searchQueue.size > 0
> + && searchQueue.headSize() <= space) {
> + Message m = searchQueue.pop();
> + searchBytesSent += m.size();
> + space -= m.size();
> + p.addMessage (m);
> + }
> + }
> + if (p.messages == null) log ("no messages added");
> + else log (p.messages.size() + " messages added");
> + }
> +
> // Called by Node when a packet arrives
> public void handlePacket (Packet p)
> {
> @@ -153,26 +196,24 @@
>
> private void handleData (Packet p)
> {
> - log ("received packet " + p.seq + ", " + p.size + " bytes");
> + log ("received " + p + ", " + p.size + " bytes");
> + sendAck (p.seq);
> if (p.seq < rxSeq || rxDupe.contains (p.seq)) {
> - log ("duplicate packet");
> - sendAck (p.seq); // Original ack may have been lost
> + log (p + " is a duplicate");
> }
> else if (p.seq == rxSeq) {
> - log ("packet in order");
> + log (p + " is in order");
> // Find the sequence number of the next missing packet
> int was = rxSeq;
> while (rxDupe.remove (++rxSeq));
> log ("rxSeq was " + was + ", now " + rxSeq);
> // Deliver the packet
> unpack (p);
> - sendAck (p.seq);
> }
> - else if (p.seq < rxSeq + SEQ_RANGE) {
> - log ("packet out of order - expected " + rxSeq);
> + else if (p.seq < rxSeq + SEQ_RANGE * 2) {
> + log (p + " is out of order - expected " + rxSeq);
> if (rxDupe.add (p.seq)) unpack (p);
> - else log ("duplicate packet");
> - sendAck (p.seq); // Original ack may have been lost
> + else log (p + " is a duplicate");
> }
> // This indicates a misbehaving sender - discard the packet
> else log ("warning: received " + p.seq + " before " + rxSeq);
> @@ -211,38 +252,11 @@
> if (txBuffer.isEmpty()) txMaxSeq = txSeq + SEQ_RANGE - 1;
> else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1;
> log ("maximum sequence number " + txMaxSeq);
> - // Send as many packets as possible
> - while (send());
> + // Send as many packets a possible
> + if (timerRunning) while (send());
> + else checkDeadlines();
> }
>
> - // Add messages to a packet
> - private void pack (Packet p)
> - {
> - // Alternate between giving searches and transfers priority
> - if (tgif) {
> - // Transfers go in first
> - while (transferQueue.size > 0
> - && p.size + transferQueue.headSize() <= Packet.MAX_SIZE)
> - p.addMessage (transferQueue.pop());
> - // Fill any remaining space with searches
> - while (searchQueue.size > 0
> - && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
> - p.addMessage (searchQueue.pop());
> - tgif = false;
> - }
> - else {
> - // Searches go in first
> - while (searchQueue.size > 0
> - && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
> - p.addMessage (searchQueue.pop());
> - // Fill any remaining space with transfers
> - while (transferQueue.size > 0
> - && p.size + transferQueue.headSize() <= Packet.MAX_SIZE)
> - p.addMessage (transferQueue.pop());
> - tgif = true;
> - }
> - }
> -
> // Remove messages from a packet and deliver them to the node
> private void unpack (Packet p)
> {
> @@ -250,17 +264,13 @@
> for (Message m : p.messages) node.handleMessage (m, this);
> }
>
> - // Called by Node, returns the next coalescing or retx deadline
> - public double checkTimeouts()
> + // Check retx timeouts, return true if there are packets in flight
> + public boolean checkTimeouts()
> {
> - log ("checking timeouts");
> - // Send as many packets as possible
> - while (send());
> + log (txBuffer.size() + " packets in flight");
> + if (txBuffer.isEmpty()) return false;
>
> double now = Event.time();
> - if (txBuffer.isEmpty()) return deadline (now);
> - log (txBuffer.size() + " packets in flight");
> -
> for (Packet p : txBuffer) {
> if (now - p.sent > RTO * rtt + MAX_DELAY) {
> // Retransmission timeout
> @@ -270,51 +280,68 @@
> window.timeout (now);
> }
> }
> -
> - // Sleep for up to MAX_DELAY seconds until the next deadline
> - return Math.min (now + MAX_DELAY, deadline (now));
> + return true;
> }
>
> - // Work out when the first ack or search or transfer needs to be sent
> - private double deadline (double now)
> + // Event callback: wake up, send packets, go back to sleep
> + private void checkDeadlines()
> {
> - return Math.min (ackQueue.deadline(), dataDeadline (now));
> + // Send as many packets as possible
> + while (send());
> + // Find the next coalescing deadline - ignore message
> + // deadlines if there isn't room in the congestion window
> + // (we have to wait for an ack before sending them)
> + double dl = ackQueue.deadline();
> + if (searchQueue.headSize() <= window.available())
> + dl = Math.min (dl, searchQueue.deadline());
> + if (transferQueue.headSize() <= window.available())
> + dl = Math.min (dl, transferQueue.deadline());
> + // If there's no deadline, stop the timer
> + if (dl == Double.POSITIVE_INFINITY) {
> + if (timerRunning) {
> + log ("stopping coalescing timer");
> + timerRunning = false;
> + }
> + return;
> + }
> + // Schedule the next check
> + double sleep = Math.max (dl - Event.time(), MIN_SLEEP);
> + if (waitingForBandwidth()) {
> + log ("waiting for bandwidth");
> + sleep = MIN_SLEEP; // Poll the bandwidth limiter
> + }
> + timerRunning = true;
> + log ("sleeping for " + sleep + " seconds");
> + Event.schedule (this, sleep, CHECK_DEADLINES, null);
> }
>
> - // Work out when the first search or transfer needs to be sent
> - private double dataDeadline (double now)
> + // Are there any messages blocked by the bandwidth limiter?
> + private boolean waitingForBandwidth()
> {
> - // If there's no data waiting, use the ack deadline
> - if (searchQueue.size + transferQueue.size == 0)
> - return Double.POSITIVE_INFINITY;
> -
> - double deadline = Math.min (searchQueue.deadline(),
> - transferQueue.deadline());
> -
> - // Delay small packets until the coalescing deadline
> - if (searchQueue.size + transferQueue.size
> - < Packet.SENSIBLE_PAYLOAD)
> - return deadline;
> -
> - // If there's not enough room in the window, wait for an ack
> - if (window.available() <= 0)
> - return Double.POSITIVE_INFINITY;
> -
> - // If there's not enough bandwidth, try again shortly
> - if (node.bandwidth.available() <= 0)
> - return Math.max (now + MIN_SLEEP, deadline);
> -
> - // Send a packet immediately
> - return now;
> + int bandwidth = node.bandwidth.available();
> + double now = Event.time();
> + if (searchQueue.headSize() > bandwidth
> + && searchQueue.deadline() <= now) return true;
> + if (transferQueue.headSize() > bandwidth
> + && transferQueue.deadline() <= now) return true;
> + return false;
> }
>
> public void log (String message)
> {
> - // Event.log (node.net.address + ":" + address + " " + message);
> + Event.log (node.net.address + ":" + address + " " + message);
> }
>
> public String toString()
> {
> return Integer.toString (address);
> }
> +
> + // EventTarget interface
> + public void handleEvent (int type, Object data)
> + {
> + if (type == CHECK_DEADLINES) checkDeadlines();
> + }
> +
> + private final static int CHECK_DEADLINES = 1;
> }
>
> Modified:
> trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
> 2006-11-01 10:00:25 UTC (rev 10774)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
> 2006-11-01 17:11:27 UTC (rev 10775)
> @@ -261,7 +261,6 @@
> }
> }
>
> - // Each EventTarget class has its own event codes
> private final static int ACCEPTED_TIMEOUT = 1;
> private final static int SEARCH_TIMEOUT = 2;
> private final static int DATA_TIMEOUT = 3;
>
> Modified:
> trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
> 2006-11-01 10:00:25 UTC (rev 10774)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
> 2006-11-01 17:11:27 UTC (rev 10775)
> @@ -132,7 +132,6 @@
> }
> }
>
> - // Each EventTarget class has its own event codes
> protected final static int ACCEPTED_TIMEOUT = 1;
> protected final static int SEARCH_TIMEOUT = 2;
> protected final static int TRANSFER_TIMEOUT = 3;
>
> Modified:
> trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
> 2006-11-01 10:00:25 UTC (rev 10774)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
> 2006-11-01 17:11:27 UTC (rev 10775)
> @@ -213,7 +213,6 @@
> }
> }
>
> - // Each EventTarget class has its own event codes
> private final static int KEY_TIMEOUT = 1;
> private final static int ACCEPTED_TIMEOUT = 2;
> private final static int SEARCH_TIMEOUT = 3;
>
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 189 bytes
Desc: Digital signature
URL:
<https://emu.freenetproject.org/pipermail/devl/attachments/20061101/648be1b2/attachment.pgp>