It's a nice simplification... but it'll use a lot of threads.
How about asynchronous callbacks?
On Wednesday 19 December 2007 17:27, you wrote:
> Author: robert
> Date: 2007-12-19 17:27:36 +0000 (Wed, 19 Dec 2007)
> New Revision: 16725
>
> Modified:
> trunk/freenet/src/freenet/node/CHKInsertSender.java
> Log:
> Wait for transfer ack's from background transfers independently (might catch
more)
>
>
> Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19
> 16:44:52
UTC (rev 16724)
> +++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19
> 17:27:36
UTC (rev 16725)
> @@ -56,8 +56,10 @@
> try {
> bt.send(node.executor);
>
> this.completedTransfer(bt.failedDueToOverload());
> +
> this.receivedNotice(waitForReceivedNotification(this));
> } catch (Throwable t) {
> this.completedTransfer(false);
> + this.receivedNotice(false);
> Logger.error(this, "Caught "+t, t);
> }
> }
> @@ -617,12 +619,12 @@
> return sentRequest;
> }
>
> - public void waitForBackgroundTransferCompletions() {
> + private void waitForBackgroundTransferCompletions() {
> try {
> freenet.support.Logger.OSThread.logPID(this);
> if(logMINOR) Logger.minor(this, "Starting "+this);
>
> - // We are presently at a terminal stage.
> + // We must presently be at such a stage that no more
> background
transfers will be added.
>
> BackgroundTransfer[] transfers;
> synchronized(backgroundTransfers) {
> @@ -631,10 +633,26 @@
> }
>
> // Wait for the outgoing transfers to complete.
> - if(!waitForCompletedTransfers(transfers)) {
> + if(!waitForBackgroundTransfers(transfers)) {
> setTransferTimedOut();
> return;
> }
> + } finally {
> + synchronized(CHKInsertSender.this) {
> + allTransfersCompleted = true;
> + CHKInsertSender.this.notifyAll();
> + }
> + }
> + }
> +
> + /**
> + * Blocks and waits for a response from the given node asto the final
transfer status in the chain. This will be longer/after
> + * the local block transfer is complete, as it is neccesary to include
> the
rount-trip-time in the allTransfersComplete()
> + * function.
> + * Returns true if received a successful notification of the downstream
reception, false in every other case
> + * (e.g. timeout, cancel, receiveFailed, etc).
> + */
> + private boolean waitForReceivedNotification(BackgroundTransfer awc) {
>
> long transfersCompletedTime =
> System.currentTimeMillis();
>
> @@ -643,7 +661,7 @@
> while(true) {
>
> synchronized(backgroundTransfers) {
> - if(receiveFailed) return;
> + if(receiveFailed) return false;
> }
> // First calculate the timeout
> int timeout;
> @@ -652,104 +670,92 @@
> if(timeout <= 0) {
> Logger.error(this, "Timed out
> waiting for transfers to complete
on "+uid);
> setTransferTimedOut();
> - return;
> + return false;
> }
>
> - MessageFilter mf = null;
> -
> - //Build a message filter to capture
> acknowledgement messages from the
nodes we are interested in.
> - for(int i=0;i<transfers.length;i++) {
> - BackgroundTransfer awc = transfers[i];
> // If disconnected, ignore.
> if(!awc.pn.isRoutable()) {
> Logger.normal(this,
> "Disconnected: "+awc.pn+"
in "+CHKInsertSender.this);
> - continue;
> + return false;
> }
> // If transfer failed, probably won't
> be acknowledged.
> if(!awc.transferSucceeded) {
> - continue;
> + if (logMINOR)
Logger.minor(this, "waitForReceivedNotification: !transferSucceeded ->
false");
> + return false;
> }
> - // Wait for completion.
> - if(!awc.receivedCompletionNotice) {
> - MessageFilter m =
> + // See if redundant.
> + if(awc.receivedCompletionNotice) {
> + return awc.completionSucceeded;
> + }
> +
> + MessageFilter mf =
>
> MessageFilter.create().setField(DMT.UID,
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
> - if(mf == null)
> - mf = m;
> - else
> - mf = m.or(mf);
> +
> if(logMINOR) Logger.minor(this,
> "Waiting for "+awc.pn.getPeer());
> - }
> - }
>
> - if (mf==null) {
> - if (logMINOR) Logger.minor(this, "Done
> waiting, no more completion
listeners");
> - return;
> - } else {
> Message m;
> try {
> m = node.usm.waitFor(mf,
> CHKInsertSender.this);
> } catch (DisconnectedException e) {
> - // Which one? I have no idea.
> - // Go around the loop again to
> find out.
> - continue;
> + Logger.normal(this,
> "Disconnected (on waitFor): "+awc.pn+"
in "+this);
> + return false;
> }
> if(m == null) {
> - Logger.error(this, "Timed out
> waiting for a final ack from any
nodes.");
> - //Would looping again help? We
> could either:
> - // (1) loop and notice that
> there is no more time left (handling the
timeout), or
> - // (2) notice that the nodes we
> are waiting on are down and possibly
exit gracefully (not implemented).
> - continue;
> + Logger.error(this, "Timed out
> waiting for a final ack
from: "+awc.pn);
> + return false;
> } else {
> - // Process message
> PeerNode pn = (PeerNode)
> m.getSource();
> // pn cannot be null, because
> the filters will prevent garbage
collection of the nodes
> - boolean processed = false;
> - for(int
> i=0;i<transfers.length;i++) {
> - PeerNode p =
> transfers[i].pn;
> - if(p == pn) {
> +
> + if(awc.pn.equals(pn)) {
> boolean
> anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
> -
> transfers[i].receivedNotice(!anyTimedOut);
> if(anyTimedOut)
> {
>
> setTransferTimedOut();
> }
> - processed =
> true;
> - break;
> + return
> !anyTimedOut;
> + } else {
> +
> Logger.error(this, "received completion notice for wrong
node: "+awc);
> + continue;
> }
> - }
> - if(!processed) {
> - Logger.error(this, "Did
> not process message: "+m+" on "+this);
> - }
> }
> }
> - }
> - } finally {
> - synchronized(CHKInsertSender.this) {
> - allTransfersCompleted = true;
> - CHKInsertSender.this.notifyAll();
> - }
> - }
> }
>
> - /** Block until all transfers have finished. @return True if
> there is any
point in waiting for acknowledgements. */
> - private boolean waitForCompletedTransfers(BackgroundTransfer[]
> transfers)
{
> + /**
> + * Block until all transfers have reached a final-terminal
> state
(success/failure). On success this means that a
> + * successful 'received-notification' has been received.
> + * @return True if all background transfers were successful.
> + */
> + private boolean waitForBackgroundTransfers(BackgroundTransfer[]
transfers) {
> // MAYBE all done
> while(true) {
> + //If we want to be sure to exit as-soon-as the
> transfers are done, then
we must hold the lock while we check.
> + synchronized(backgroundTransfers) {
> + if(receiveFailed) return false;
> +
> boolean noneRouteable = true;
> boolean completedTransfers = true;
> + boolean completedNotifications = true;
> for(int i=0;i<transfers.length;i++) {
> if(!transfers[i].pn.isRoutable())
> continue;
> noneRouteable = false;
> if(!transfers[i].completedTransfer) {
> + //must wait
> completedTransfers = false;
> break;
> }
> + if
> (!transfers[i].receivedCompletionNotice) {
> + //must wait
> + completedNotifications = false;
> + break;
> + }
> + if (!transfers[i].completionSucceeded)
> + return false;
> }
> - if(completedTransfers) return true;
> if(noneRouteable) return false;
> + if(completedTransfers &&
> completedNotifications) return true;
>
> - synchronized(backgroundTransfers) {
> - if(receiveFailed) return false;
> - if(logMINOR) Logger.minor(this,
> "Waiting for completion");
> + if(logMINOR) Logger.minor(this,
> "Waiting for
(completion="+!completedTransfers+",
notification="+completedNotifications+")");
> try {
>
> backgroundTransfers.wait(100*1000);
> } catch (InterruptedException e) {
>
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
>
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
URL:
<https://emu.freenetproject.org/pipermail/devl/attachments/20071219/50b32c93/attachment.pgp>