It's a nice simplification... but it'll use a lot of threads.

How about asynchronous callbacks?

On Wednesday 19 December 2007 17:27, you wrote:
> Author: robert
> Date: 2007-12-19 17:27:36 +0000 (Wed, 19 Dec 2007)
> New Revision: 16725
> 
> Modified:
>    trunk/freenet/src/freenet/node/CHKInsertSender.java
> Log:
> Wait for transfer ack's from background transfers independently (might catch 
more)
> 
> 
> Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/CHKInsertSender.java       2007-12-19 
> 16:44:52 
UTC (rev 16724)
> +++ trunk/freenet/src/freenet/node/CHKInsertSender.java       2007-12-19 
> 17:27:36 
UTC (rev 16725)
> @@ -56,8 +56,10 @@
>                       try {
>                               bt.send(node.executor);
>                               
> this.completedTransfer(bt.failedDueToOverload());
> +                             
> this.receivedNotice(waitForReceivedNotification(this));
>                       } catch (Throwable t) {
>                               this.completedTransfer(false);
> +                             this.receivedNotice(false);
>                               Logger.error(this, "Caught "+t, t);
>                       }
>               }
> @@ -617,12 +619,12 @@
>               return sentRequest;
>       }
>               
> -             public void waitForBackgroundTransferCompletions() {
> +             private void waitForBackgroundTransferCompletions() {
>                       try {
>                   freenet.support.Logger.OSThread.logPID(this);
>                       if(logMINOR) Logger.minor(this, "Starting "+this);
>                       
> -                     // We are presently at a terminal stage.
> +                     // We must presently be at such a stage that no more 
> background 
transfers will be added.
>                       
>                       BackgroundTransfer[] transfers;
>                       synchronized(backgroundTransfers) {
> @@ -631,10 +633,26 @@
>                       }
>                       
>                       // Wait for the outgoing transfers to complete.
> -                     if(!waitForCompletedTransfers(transfers)) {
> +                     if(!waitForBackgroundTransfers(transfers)) {
>                               setTransferTimedOut();
>                               return;
>                       }
> +                     } finally {
> +                             synchronized(CHKInsertSender.this) {
> +                                     allTransfersCompleted = true;
> +                                     CHKInsertSender.this.notifyAll();
> +                             }
> +                     }
> +             }
> +     
> +     /**
> +      * Blocks and waits for a response from the given node asto the final 
transfer status in the chain. This will be longer/after
> +      * the local block transfer is complete, as it is neccesary to include 
> the 
rount-trip-time in the allTransfersComplete()
> +      * function.
> +      * Returns true if received a successful notification of the downstream 
reception, false in every other case
> +      * (e.g. timeout, cancel, receiveFailed, etc).
> +      */
> +     private boolean waitForReceivedNotification(BackgroundTransfer awc) {
>                               
>                       long transfersCompletedTime = 
> System.currentTimeMillis();
>                       
> @@ -643,7 +661,7 @@
>                       while(true) {
>                               
>                               synchronized(backgroundTransfers) {
> -                                     if(receiveFailed) return;
> +                                     if(receiveFailed) return false;
>                               }
>                               // First calculate the timeout
>                               int timeout;
> @@ -652,104 +670,92 @@
>                               if(timeout <= 0) {
>                                               Logger.error(this, "Timed out 
> waiting for transfers to complete 
on "+uid);
>                                               setTransferTimedOut();
> -                                     return;
> +                                     return false;
>                               }
>                               
> -                             MessageFilter mf = null;
> -                             
> -                             //Build a message filter to capture 
> acknowledgement messages from the 
nodes we are interested in.
> -                             for(int i=0;i<transfers.length;i++) {
> -                                     BackgroundTransfer awc = transfers[i];
>                                       // If disconnected, ignore.
>                                       if(!awc.pn.isRoutable()) {
>                                               Logger.normal(this, 
> "Disconnected: "+awc.pn+" 
in "+CHKInsertSender.this);
> -                                             continue;
> +                                             return false;
>                                       }
>                                       // If transfer failed, probably won't 
> be acknowledged.
>                                       if(!awc.transferSucceeded) {
> -                                             continue;
> +                                             if (logMINOR) 
Logger.minor(this, "waitForReceivedNotification: !transferSucceeded -> 
false");
> +                                             return false;
>                                       }
> -                                     // Wait for completion.
> -                                     if(!awc.receivedCompletionNotice) {
> -                                             MessageFilter m =
> +                                     // See if redundant.
> +                                     if(awc.receivedCompletionNotice) {
> +                                             return awc.completionSucceeded;
> +                                     }
> +                             
> +                             MessageFilter mf =
>                                                       
> MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
> -                                             if(mf == null)
> -                                                     mf = m;
> -                                             else
> -                                                     mf = m.or(mf);
> +
>                                               if(logMINOR) Logger.minor(this, 
> "Waiting for "+awc.pn.getPeer());
> -                                     }
> -                             }
>                               
> -                             if (mf==null) {
> -                                     if (logMINOR) Logger.minor(this, "Done 
> waiting, no more completion 
listeners");
> -                                     return;
> -                             } else {
>                                       Message m;
>                                       try {
>                                               m = node.usm.waitFor(mf, 
> CHKInsertSender.this);
>                                       } catch (DisconnectedException e) {
> -                                             // Which one? I have no idea.
> -                                             // Go around the loop again to 
> find out.
> -                                             continue;
> +                                             Logger.normal(this, 
> "Disconnected (on waitFor): "+awc.pn+" 
in "+this);
> +                                             return false;
>                                       }
>                                       if(m == null) {
> -                                             Logger.error(this, "Timed out 
> waiting for a final ack from any 
nodes.");
> -                                             //Would looping again help? We 
> could either:
> -                                             // (1) loop and notice that 
> there is no more time left (handling the 
timeout), or
> -                                             // (2) notice that the nodes we 
> are waiting on are down and possibly 
exit gracefully (not implemented).
> -                                             continue;
> +                                             Logger.error(this, "Timed out 
> waiting for a final ack 
from: "+awc.pn);
> +                                             return false;
>                                       } else {
> -                                             // Process message
>                                               PeerNode pn = (PeerNode) 
> m.getSource();
>                                               // pn cannot be null, because 
> the filters will prevent garbage 
collection of the nodes
> -                                             boolean processed = false;
> -                                             for(int 
> i=0;i<transfers.length;i++) {
> -                                                     PeerNode p = 
> transfers[i].pn;
> -                                                     if(p == pn) {
> +
> +                                                     if(awc.pn.equals(pn)) {
>                                                               boolean 
> anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
> -                                                             
> transfers[i].receivedNotice(!anyTimedOut);
>                                                               if(anyTimedOut) 
> {
>                                                                       
> setTransferTimedOut();
>                                                               }
> -                                                             processed = 
> true;
> -                                                             break;
> +                                                             return 
> !anyTimedOut;
> +                                                     } else {
> +                                                             
> Logger.error(this, "received completion notice for wrong 
node: "+awc);
> +                                                             continue;
>                                                       }
> -                                             }
> -                                             if(!processed) {
> -                                                     Logger.error(this, "Did 
> not process message: "+m+" on "+this);
> -                                             }
>                                       }
>                               }
> -                     }
> -                     } finally {
> -                             synchronized(CHKInsertSender.this) {
> -                                     allTransfersCompleted = true;
> -                                     CHKInsertSender.this.notifyAll();
> -                             }
> -                     }
>               }
>  
> -             /** Block until all transfers have finished. @return True if 
> there is any 
point in waiting for acknowledgements. */
> -             private boolean waitForCompletedTransfers(BackgroundTransfer[] 
> transfers) 
{
> +             /**
> +              * Block until all transfers have reached a final-terminal 
> state 
(success/failure). On success this means that a
> +              * successful 'received-notification' has been received.
> +              * @return True if all background transfers were successful.
> +              */
> +             private boolean waitForBackgroundTransfers(BackgroundTransfer[] 
transfers) {
>                       // MAYBE all done
>                       while(true) {
> +                             //If we want to be sure to exit as-soon-as the 
> transfers are done, then 
we must hold the lock while we check.
> +                             synchronized(backgroundTransfers) {
> +                                     if(receiveFailed) return false;
> +
>                               boolean noneRouteable = true;
>                               boolean completedTransfers = true;
> +                             boolean completedNotifications = true;
>                               for(int i=0;i<transfers.length;i++) {
>                                       if(!transfers[i].pn.isRoutable()) 
> continue;
>                                       noneRouteable = false;
>                                       if(!transfers[i].completedTransfer) {
> +                                             //must wait
>                                               completedTransfers = false;
>                                               break;
>                                       }
> +                                     if 
> (!transfers[i].receivedCompletionNotice) {
> +                                             //must wait
> +                                             completedNotifications = false;
> +                                             break;
> +                                     }
> +                                     if (!transfers[i].completionSucceeded)
> +                                             return false;
>                               }
> -                             if(completedTransfers) return true;
>                               if(noneRouteable) return false;
> +                             if(completedTransfers && 
> completedNotifications) return true;
>  
> -                             synchronized(backgroundTransfers) {
> -                                     if(receiveFailed) return false;
> -                                     if(logMINOR) Logger.minor(this, 
> "Waiting for completion");
> +                                     if(logMINOR) Logger.minor(this, 
> "Waiting for 
(completion="+!completedTransfers+", 
notification="+completedNotifications+")");
>                                       try {
>                                               
> backgroundTransfers.wait(100*1000);
>                                       } catch (InterruptedException e) {
> 
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
> 
> 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
URL: 
<https://emu.freenetproject.org/pipermail/devl/attachments/20071219/50b32c93/attachment.pgp>

Reply via email to