You need to call .setMatchesDroppedConnection() 
and .setMatchesRestartedConnections() on the filter.

On Wednesday 19 December 2007 19:31, robert at freenetproject.org wrote:
> Author: robert
> Date: 2007-12-19 19:31:05 +0000 (Wed, 19 Dec 2007)
> New Revision: 16733
> 
> Modified:
>    trunk/freenet/src/freenet/node/CHKInsertSender.java
> Log:
> use message callback rather than hanging onto thread
> 
> 
> Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/CHKInsertSender.java       2007-12-19 
> 19:10:21 
UTC (rev 16732)
> +++ trunk/freenet/src/freenet/node/CHKInsertSender.java       2007-12-19 
> 19:31:05 
UTC (rev 16733)
> @@ -6,6 +6,7 @@
>  import java.util.HashSet;
>  import java.util.Vector;
>  
> +import freenet.io.comm.AsyncMessageFilterCallback;
>  import freenet.io.comm.ByteCounter;
>  import freenet.io.comm.DMT;
>  import freenet.io.comm.DisconnectedException;
> @@ -24,7 +25,7 @@
>  
>  public final class CHKInsertSender implements Runnable, AnyInsertSender, 
ByteCounter {
>       
> -     private class BackgroundTransfer implements Runnable {
> +     private class BackgroundTransfer implements Runnable, 
AsyncMessageFilterCallback {
>               /** Node we are waiting for response from */
>               final PeerNode pn;
>               /** We may be sending data to that node */
> @@ -42,6 +43,8 @@
>               /** Did it succeed? */
>               boolean transferSucceeded;
>               
> +             long transferCompletedTime;
> +             
>               BackgroundTransfer(PeerNode pn, PartiallyReceivedBlock prb) {
>                       this.pn = pn;
>                       bt = new BlockTransmitter(node.usm, pn, uid, prb, 
> node.outputThrottle, 
CHKInsertSender.this);
> @@ -56,7 +59,13 @@
>                       try {
>                               bt.send(node.executor);
>                               
> this.completedTransfer(bt.failedDueToOverload());
> -                             
> this.receivedNotice(waitForReceivedNotification(this));
> +                             if (pn.isRoutable() && transferSucceeded) {
> +                                     //synch-version: 
this.receivedNotice(waitForReceivedNotification(this));
> +                                     //Add ourselves as a listener for the 
> longterm completion message of 
this transfer, then gracefully exit.
> +                                     
> node.usm.addAsyncFilter(getNotificationMessageFilter(), this);
> +                             } else {
> +                                     this.receivedNotice(false);
> +                             }
>                       } catch (Throwable t) {
>                               this.completedTransfer(false);
>                               this.receivedNotice(false);
> @@ -64,10 +73,11 @@
>                       }
>               }
>               
> -             void completedTransfer(boolean success) {
> +             private void completedTransfer(boolean success) {
>                       synchronized(this) {
>                               transferSucceeded = success;
>                               completedTransfer = true;
> +                             transferCompletedTime = 
> System.currentTimeMillis();
>                               notifyAll();
>                       }
>                       synchronized(backgroundTransfers) {
> @@ -78,11 +88,15 @@
>                       }
>               }
>               
> -             void receivedNotice(boolean success) {
> +             private void receivedNotice(boolean success) {
>                       synchronized(this) {
> +                             if (receivedCompletionNotice) {
> +                                     Logger.error(this, 
> "receivedNotice("+success+"), already had 
receivedNotice("+completionSucceeded+")");
> +                             } else {
>                               completionSucceeded = success;
>                               receivedCompletionNotice = true;
>                               notifyAll();
> +                             }
>                       }
>                       synchronized(backgroundTransfers) {
>                               backgroundTransfers.notifyAll();
> @@ -92,6 +106,39 @@
>                       }                       
>               }
>               
> +             public void onMatched(Message m) {
> +                     PeerNode pn = (PeerNode) m.getSource();
> +                     // pn cannot be null, because the filters will prevent 
> garbage 
collection of the nodes
> +                     
> +                     if(this.pn.equals(pn)) {
> +                             boolean anyTimedOut = 
> m.getBoolean(DMT.ANY_TIMED_OUT);
> +                             if(anyTimedOut) {
> +                                     
> CHKInsertSender.this.setTransferTimedOut();
> +                             }
> +                             receivedNotice(!anyTimedOut);
> +                     } else {
> +                             Logger.error(this, "received completion notice 
> for wrong 
node: "+pn+" != "+this.pn);
> +                     }                       
> +             }
> +             
> +             public boolean shouldTimeout() {
> +                     //AFIACS, this will still let the filter timeout, but 
> not call 
onMatched() twice.
> +                     return receivedCompletionNotice;
> +             }
> +             
> +             private MessageFilter getNotificationMessageFilter() {
> +                     return MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(pn).setTimeout(TRANSFER_COMPLETION_ACK_TIMEOUT);
> +             }
> +             
> +             boolean isTimedOut() {
> +                     return 
System.currentTimeMillis()>(transferCompletedTime+TRANSFER_COMPLETION_ACK_TIMEOUT);
> +             }
> +             
> +             public void maybeTimedOut() {
> +                     if (isTimedOut()) {
> +                             receivedNotice(false);
> +                     }
> +             }
>       }
>       
>       CHKInsertSender(NodeCHK myKey, long uid, byte[] headers, short htl, 
> @@ -651,83 +698,7 @@
>                               }
>                       }
>               }
> -     
> -     /**
> -      * Blocks and waits for a response from the given node asto the final 
transfer status in the chain. This will be longer/after
> -      * the local block transfer is complete, as it is neccesary to include 
> the 
rount-trip-time in the allTransfersComplete()
> -      * function.
> -      * Returns true if received a successful notification of the downstream 
reception, false in every other case
> -      * (e.g. timeout, cancel, receiveFailed, etc).
> -      */
> -     private boolean waitForReceivedNotification(BackgroundTransfer awc) {
> -                             
> -                     long transfersCompletedTime = 
> System.currentTimeMillis();
> -                     
> -                     // Wait for acknowledgements from each node, or 
> timeouts.
> -                     
> -                     while(true) {
> -                             
> -                             synchronized(backgroundTransfers) {
> -                                     if(receiveFailed) return false;
> -                             }
> -                             // First calculate the timeout
> -                             int timeout;
> -                             long now = System.currentTimeMillis();
> -                             timeout = (int)Math.min(Integer.MAX_VALUE, 
> (transfersCompletedTime + 
TRANSFER_COMPLETION_ACK_TIMEOUT) - now);
> -                             if(timeout <= 0) {
> -                                             Logger.error(this, "Timed out 
> waiting for transfers to complete 
on "+uid);
> -                                             setTransferTimedOut();
> -                                     return false;
> -                             }
> -                             
> -                                     // If disconnected, ignore.
> -                                     if(!awc.pn.isRoutable()) {
> -                                             Logger.normal(this, 
> "Disconnected: "+awc.pn+" 
in "+CHKInsertSender.this);
> -                                             return false;
> -                                     }
> -                                     // If transfer failed, probably won't 
> be acknowledged.
> -                                     if(!awc.transferSucceeded) {
> -                                             if (logMINOR) 
Logger.minor(this, "waitForReceivedNotification: !transferSucceeded -> 
false");
> -                                             return false;
> -                                     }
> -                                     // See if redundant.
> -                                     if(awc.receivedCompletionNotice) {
> -                                             return awc.completionSucceeded;
> -                                     }
> -                             
> -                             MessageFilter mf =
> -                                                     
> MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
>  
> -                                             if(logMINOR) Logger.minor(this, 
> "Waiting for "+awc.pn.getPeer());
> -                             
> -                                     Message m;
> -                                     try {
> -                                             m = node.usm.waitFor(mf, 
> CHKInsertSender.this);
> -                                     } catch (DisconnectedException e) {
> -                                             Logger.normal(this, 
> "Disconnected (on waitFor): "+awc.pn+" 
in "+this);
> -                                             return false;
> -                                     }
> -                                     if(m == null) {
> -                                             Logger.error(this, "Timed out 
> waiting for a final ack 
from: "+awc.pn);
> -                                             return false;
> -                                     } else {
> -                                             PeerNode pn = (PeerNode) 
> m.getSource();
> -                                             // pn cannot be null, because 
> the filters will prevent garbage 
collection of the nodes
> -
> -                                                     if(awc.pn.equals(pn)) {
> -                                                             boolean 
> anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
> -                                                             if(anyTimedOut) 
> {
> -                                                                     
> setTransferTimedOut();
> -                                                             }
> -                                                             return 
> !anyTimedOut;
> -                                                     } else {
> -                                                             
> Logger.error(this, "received completion notice for wrong 
node: "+awc);
> -                                                             continue;
> -                                                     }
> -                                     }
> -                             }
> -             }
> -
>               /**
>                * Block until all transfers have reached a final-terminal 
> state 
(success/failure). On success this means that a
>                * successful 'received-notification' has been received.
> @@ -751,6 +722,7 @@
>                                               completedTransfers = false;
>                                               break;
>                                       }
> +                                     transfers[i].maybeTimedOut();
>                                       if 
> (!transfers[i].receivedCompletionNotice) {
>                                               //must wait
>                                               completedNotifications = false;
> 
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
> 
> 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
URL: 
<https://emu.freenetproject.org/pipermail/devl/attachments/20071220/1158e1e3/attachment.pgp>

Reply via email to