With such an aggressive SEND_TIMEOUT, this borders on reviving NGRouting -
it's a major change to routing, which since it hasn't been simulated will
almost certainly cause major problems. No?
On Monday 07 January 2008 20:07, robert at freenetproject.org wrote:
> Author: robert
> Date: 2008-01-07 20:07:30 +0000 (Mon, 07 Jan 2008)
> New Revision: 16959
>
> Modified:
> trunk/freenet/src/freenet/node/MessageItem.java
> trunk/freenet/src/freenet/node/PeerNode.java
> trunk/freenet/src/freenet/node/RequestSender.java
> Log:
> implement conditionalSend: sendSync-with-timeout, aborts message send
>
>
> Modified: trunk/freenet/src/freenet/node/MessageItem.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/MessageItem.java 2008-01-07 19:31:24 UTC
(rev 16958)
> +++ trunk/freenet/src/freenet/node/MessageItem.java 2008-01-07 20:07:30 UTC
(rev 16959)
> @@ -72,4 +72,8 @@
> }
> }
> }
> +
> + public boolean isForMessage(Message msg) {
> + return this.msg.equals(msg);
> + }
> }
>
> Modified: trunk/freenet/src/freenet/node/PeerNode.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/PeerNode.java 2008-01-07 19:31:24 UTC
(rev 16958)
> +++ trunk/freenet/src/freenet/node/PeerNode.java 2008-01-07 20:07:30 UTC
(rev 16959)
> @@ -16,6 +16,7 @@
> import java.util.Hashtable;
> import java.util.Iterator;
> import java.util.LinkedList;
> +import java.util.ListIterator;
> import java.util.Vector;
> import java.util.zip.DataFormatException;
> import java.util.zip.Inflater;
> @@ -968,6 +969,21 @@
> // It will wake up before the maximum coalescing delay (100ms)
> because
> // it wakes up every 100ms *anyway*.
> }
> +
> + private boolean maybeRemoveMessageFromQueue(Message removeMe) {
> + Logger.normal(this, "attempting to remove message from
send-queue: "+removeMe);
> + synchronized (messagesToSendNow) {
> + ListIterator i=messagesToSendNow.listIterator();
> + while (i.hasNext()) {
> + MessageItem it=(MessageItem)i.next();
> + if (it.isForMessage(removeMe)) {
> + i.remove();
> + return true;
> + }
> + }
> + }
> + return false;
> + }
>
> public long getMessageQueueLengthBytes() {
> long x = 0;
> @@ -1396,6 +1412,36 @@
> }
>
> /**
> + * Conceptually, send a message to this node IF it can be done
within 'timeout', returing true
> + * only after the message was sent (similiar to sendSync), and false if
the message cannot be
> + * sent to the node in that time period. As an optimization, however,
> this
function may return
> + * immediately if it is determined that the message would not leave the
node within the timeout
> + * period.
> + */
> + public boolean conditionalSend(Message req, ByteCounter ctr, long
> timeout)
throws NotConnectedException {
> + if (timeout<=0)
> + return false;
> + if
> (getMessageQueueLengthBytes()/(getThrottle().getBandwidth()+1.0) >
timeout) {
> + Logger.normal(this, "conditionalSend; pre-emptively not
> sending message
("+timeout+"ms): "+req);
> + return false;
> + }
> + SyncMessageCallback cb = new SyncMessageCallback();
> + sendAsync(req, cb, 0, ctr);
> + cb.waitForSend(timeout);
> + if (cb.done) {
> + return true;
> + } else {
> + //best-effort: remove the message from the send queue
> it is ok if we
can't prematurely
> + //remove the item (i.e. race condition / now it is
> sent), but it will
generate unclaimed messages, etc.
> + if (!maybeRemoveMessageFromQueue(req))
> + Logger.error(this, "unable to stop transmition
> of request: "+req);
> + else
> + Logger.normal(this, "removed request from queue
> for timeout: "+req);
> + return false;
> + }
> + }
> +
> + /**
> * Enqueue a message to be sent to this node and wait up to a minute for
> it
to be transmitted.
> */
> public void sendSync(Message req, ByteCounter ctr) throws
NotConnectedException {
> @@ -1426,7 +1472,7 @@
> }
> }
> if(isConnected())
> - Logger.error(this, "Waited too long for a
> blocking send on " + this + "
for " + PeerNode.this, new Exception("error"));
> + Logger.normal(this, "Waited too long for a
> blocking send on " + this
+ " for " + PeerNode.this, new Exception("error"));
> }
>
> public void acknowledged() {
>
> Modified: trunk/freenet/src/freenet/node/RequestSender.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/RequestSender.java 2008-01-07 19:31:24
UTC (rev 16958)
> +++ trunk/freenet/src/freenet/node/RequestSender.java 2008-01-07 20:07:30
UTC (rev 16959)
> @@ -4,6 +4,7 @@
> package freenet.node;
>
> import java.util.HashSet;
> +import java.util.ArrayList;
>
> import freenet.crypt.CryptFormatException;
> import freenet.crypt.DSAPublicKey;
> @@ -46,6 +47,8 @@
> public final class RequestSender implements Runnable, ByteCounter {
>
> // Constants
> + //SEND_TIMEOUT is not a hard timeout, shoot low for low latency
(250-500ms?).
> + static final int SEND_TIMEOUT = 1000;
> static final int ACCEPTED_TIMEOUT = 5000;
> static final int FETCH_TIMEOUT = 120000;
> /** Wait up to this long to get a path folding reply */
> @@ -141,6 +144,7 @@
> int rejectOverloads=0;
> HashSet nodesRoutedTo = new HashSet();
> HashSet nodesNotIgnored = new HashSet();
> + ArrayList busyPeers = new ArrayList();
> while(true) {
> if(logMINOR) Logger.minor(this, "htl="+htl);
> if(htl == 0) {
> @@ -159,9 +163,24 @@
> routeAttempts++;
>
> // Route it
> + long sendTimeout = SEND_TIMEOUT;
> + boolean usingBusyPeer=false;
> PeerNode next;
> next = node.peers.closerPeer(source, nodesRoutedTo,
nodesNotIgnored, target, true, node.isAdvancedModeEnabled(), -1, null);
>
> + if (next == null && !busyPeers.isEmpty()) {
> + next = (PeerNode)busyPeers.remove(0);
> + usingBusyPeer=true;
> + if (logMINOR) Logger.minor(this, "trying
> previously-found busy
peer: "+next);
> + //NOTE: if we are at this point, it is already
> presumed that the
message cannot even make it off the node to this peer in SEND_TIMEOUT, use
all the timeout we have left.
> + sendTimeout =
> FETCH_TIMEOUT-(System.currentTimeMillis()-startTime);
> + //Edge case, local request & we are running w/o
> any time left.
> + if (sendTimeout < SEND_TIMEOUT && source==null)
> {
> + if (logMINOR) Logger.minor(this,
> "increasing timeout for local
request");
> + sendTimeout = 2*SEND_TIMEOUT;
> + }
> + }
> +
> if(next == null) {
> if (logMINOR && rejectOverloads>0)
> Logger.minor(this, "no more peers, but
> overloads
("+rejectOverloads+"/"+routeAttempts+" overloaded)");
> @@ -187,11 +206,18 @@
> // So take it from when we first started to try to send the
request.
> // See comments below when handling FNPRecentlyFailed for why
we need this.
> long timeSentRequest = System.currentTimeMillis();
> -
> +
> try {
> //This is the first contact to this node
> //async is preferred, but makes ACCEPTED_TIMEOUT much more
likely for long send queues.
> - next.sendAsync(req, null, 0, this);
> + //using conditionalSend this way might actually
> approximate Q-routing
load balancing accross the network.
> + if (!next.conditionalSend(req, this, sendTimeout)) {
> + if (usingBusyPeer)
> + continue;
> + Logger.normal(this, "will try this peer
> later if no others are
available");
> + busyPeers.add(next);
> + continue;
> + }
> } catch (NotConnectedException e) {
> Logger.minor(this, "Not connected");
> continue;
>
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
>
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
URL:
<https://emu.freenetproject.org/pipermail/devl/attachments/20080110/ec063106/attachment.pgp>