With such an aggressive SEND_TIMEOUT, this borders on reviving NGRouting - 
it's a major change to routing, which since it hasn't been simulated will 
almost certainly cause major problems. No?

On Monday 07 January 2008 20:07, robert at freenetproject.org wrote:
> Author: robert
> Date: 2008-01-07 20:07:30 +0000 (Mon, 07 Jan 2008)
> New Revision: 16959
> 
> Modified:
>    trunk/freenet/src/freenet/node/MessageItem.java
>    trunk/freenet/src/freenet/node/PeerNode.java
>    trunk/freenet/src/freenet/node/RequestSender.java
> Log:
> implement conditionalSend: sendSync-with-timeout, aborts message send
> 
> 
> Modified: trunk/freenet/src/freenet/node/MessageItem.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/MessageItem.java   2008-01-07 19:31:24 UTC 
(rev 16958)
> +++ trunk/freenet/src/freenet/node/MessageItem.java   2008-01-07 20:07:30 UTC 
(rev 16959)
> @@ -72,4 +72,8 @@
>                       }
>               }
>       }
> +     
> +     public boolean isForMessage(Message msg) {
> +             return this.msg.equals(msg);
> +     }
>  }
> 
> Modified: trunk/freenet/src/freenet/node/PeerNode.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/PeerNode.java      2008-01-07 19:31:24 UTC 
(rev 16958)
> +++ trunk/freenet/src/freenet/node/PeerNode.java      2008-01-07 20:07:30 UTC 
(rev 16959)
> @@ -16,6 +16,7 @@
>  import java.util.Hashtable;
>  import java.util.Iterator;
>  import java.util.LinkedList;
> +import java.util.ListIterator;
>  import java.util.Vector;
>  import java.util.zip.DataFormatException;
>  import java.util.zip.Inflater;
> @@ -968,6 +969,21 @@
>               // It will wake up before the maximum coalescing delay (100ms) 
> because
>               // it wakes up every 100ms *anyway*.
>       }
> +     
> +     private boolean maybeRemoveMessageFromQueue(Message removeMe) {
> +             Logger.normal(this, "attempting to remove message from 
send-queue: "+removeMe);
> +             synchronized (messagesToSendNow) {
> +                     ListIterator i=messagesToSendNow.listIterator();
> +                     while (i.hasNext()) {
> +                             MessageItem it=(MessageItem)i.next();
> +                             if (it.isForMessage(removeMe)) {
> +                                     i.remove();
> +                                     return true;
> +                             }
> +                     }
> +             }
> +             return false;
> +     }
>  
>       public long getMessageQueueLengthBytes() {
>               long x = 0;
> @@ -1396,6 +1412,36 @@
>       }
>  
>       /**
> +      * Conceptually, send a message to this node IF it can be done 
within 'timeout', returing true
> +      * only after the message was sent (similiar to sendSync), and false if 
the message cannot be
> +      * sent to the node in that time period. As an optimization, however, 
> this 
function may return
> +      * immediately if it is determined that the message would not leave the 
node within the timeout
> +      * period.
> +      */
> +     public boolean conditionalSend(Message req, ByteCounter ctr, long 
> timeout) 
throws NotConnectedException {
> +             if (timeout<=0)
> +                     return false;
> +             if 
> (getMessageQueueLengthBytes()/(getThrottle().getBandwidth()+1.0) > 
timeout) {
> +                     Logger.normal(this, "conditionalSend; pre-emptively not 
> sending message 
("+timeout+"ms): "+req);
> +                     return false;
> +             }
> +             SyncMessageCallback cb = new SyncMessageCallback();
> +             sendAsync(req, cb, 0, ctr);
> +             cb.waitForSend(timeout);
> +             if (cb.done) {
> +                     return true;
> +             } else {
> +                     //best-effort: remove the message from the send queue 
> it is ok if we 
can't prematurely
> +                     //remove the item (i.e. race condition / now it is 
> sent), but it will 
generate unclaimed messages, etc.
> +                     if (!maybeRemoveMessageFromQueue(req))
> +                             Logger.error(this, "unable to stop transmition 
> of request: "+req);
> +                     else
> +                             Logger.normal(this, "removed request from queue 
> for timeout: "+req);
> +                     return false;
> +             }
> +     }
> +     
> +     /**
>       * Enqueue a message to be sent to this node and wait up to a minute for 
> it 
to be transmitted.
>       */
>       public void sendSync(Message req, ByteCounter ctr) throws 
NotConnectedException {
> @@ -1426,7 +1472,7 @@
>                               }
>                       }
>                       if(isConnected())
> -                             Logger.error(this, "Waited too long for a 
> blocking send on " + this + " 
for " + PeerNode.this, new Exception("error"));
> +                             Logger.normal(this, "Waited too long for a 
> blocking send on " + this 
+ " for " + PeerNode.this, new Exception("error"));
>               }
>  
>               public void acknowledged() {
> 
> Modified: trunk/freenet/src/freenet/node/RequestSender.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/RequestSender.java 2008-01-07 19:31:24 
UTC (rev 16958)
> +++ trunk/freenet/src/freenet/node/RequestSender.java 2008-01-07 20:07:30 
UTC (rev 16959)
> @@ -4,6 +4,7 @@
>  package freenet.node;
>  
>  import java.util.HashSet;
> +import java.util.ArrayList;
>  
>  import freenet.crypt.CryptFormatException;
>  import freenet.crypt.DSAPublicKey;
> @@ -46,6 +47,8 @@
>  public final class RequestSender implements Runnable, ByteCounter {
>  
>      // Constants
> +     //SEND_TIMEOUT is not a hard timeout, shoot low for low latency 
(250-500ms?).
> +     static final int SEND_TIMEOUT = 1000;
>      static final int ACCEPTED_TIMEOUT = 5000;
>      static final int FETCH_TIMEOUT = 120000;
>      /** Wait up to this long to get a path folding reply */
> @@ -141,6 +144,7 @@
>               int rejectOverloads=0;
>          HashSet nodesRoutedTo = new HashSet();
>          HashSet nodesNotIgnored = new HashSet();
> +             ArrayList busyPeers = new ArrayList();
>          while(true) {
>              if(logMINOR) Logger.minor(this, "htl="+htl);
>              if(htl == 0) {
> @@ -159,9 +163,24 @@
>                       routeAttempts++;
>              
>              // Route it
> +                     long sendTimeout = SEND_TIMEOUT;
> +                     boolean usingBusyPeer=false;
>              PeerNode next;
>              next = node.peers.closerPeer(source, nodesRoutedTo, 
nodesNotIgnored, target, true, node.isAdvancedModeEnabled(), -1, null);
>              
> +                     if (next == null && !busyPeers.isEmpty()) {
> +                             next = (PeerNode)busyPeers.remove(0);
> +                             usingBusyPeer=true;
> +                             if (logMINOR) Logger.minor(this, "trying 
> previously-found busy 
peer: "+next);
> +                             //NOTE: if we are at this point, it is already 
> presumed that the 
message cannot even make it off the node to this peer in SEND_TIMEOUT, use 
all the timeout we have left.
> +                             sendTimeout = 
> FETCH_TIMEOUT-(System.currentTimeMillis()-startTime);
> +                             //Edge case, local request & we are running w/o 
> any time left.
> +                             if (sendTimeout < SEND_TIMEOUT && source==null) 
> {
> +                                     if (logMINOR) Logger.minor(this, 
> "increasing timeout for local 
request");
> +                                     sendTimeout = 2*SEND_TIMEOUT;
> +                             }
> +                     }
> +                     
>              if(next == null) {
>                               if (logMINOR && rejectOverloads>0)
>                                       Logger.minor(this, "no more peers, but 
> overloads 
("+rejectOverloads+"/"+routeAttempts+" overloaded)");
> @@ -187,11 +206,18 @@
>              // So take it from when we first started to try to send the 
request.
>              // See comments below when handling FNPRecentlyFailed for why 
we need this.
>              long timeSentRequest = System.currentTimeMillis();
> -            
> +                     
>              try {
>               //This is the first contact to this node
>               //async is preferred, but makes ACCEPTED_TIMEOUT much more 
likely for long send queues.
> -             next.sendAsync(req, null, 0, this);
> +                             //using conditionalSend this way might actually 
> approximate Q-routing 
load balancing accross the network.
> +             if (!next.conditionalSend(req, this, sendTimeout)) {
> +                                     if (usingBusyPeer)
> +                                             continue;
> +                                     Logger.normal(this, "will try this peer 
> later if no others are 
available");
> +                                     busyPeers.add(next);
> +                                     continue;
> +                             }
>              } catch (NotConnectedException e) {
>               Logger.minor(this, "Not connected");
>               continue;
> 
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
> 
> 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
URL: 
<https://emu.freenetproject.org/pipermail/devl/attachments/20080110/ec063106/attachment.pgp>

Reply via email to