On Thursday 20 December 2007 00:25, Matthew Toseland wrote:
> You need to call .setMatchesDroppedConnection()
> and .setMatchesRestartedConnections() on the filter.
Sorry, no you don't, it's on by default.
>
> On Wednesday 19 December 2007 19:31, robert at freenetproject.org wrote:
> > Author: robert
> > Date: 2007-12-19 19:31:05 +0000 (Wed, 19 Dec 2007)
> > New Revision: 16733
> >
> > Modified:
> > trunk/freenet/src/freenet/node/CHKInsertSender.java
> > Log:
> > use message callback rather than hanging onto thread
> >
> >
> > Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
> > ===================================================================
> > --- trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19
19:10:21
> UTC (rev 16732)
> > +++ trunk/freenet/src/freenet/node/CHKInsertSender.java 2007-12-19
19:31:05
> UTC (rev 16733)
> > @@ -6,6 +6,7 @@
> > import java.util.HashSet;
> > import java.util.Vector;
> >
> > +import freenet.io.comm.AsyncMessageFilterCallback;
> > import freenet.io.comm.ByteCounter;
> > import freenet.io.comm.DMT;
> > import freenet.io.comm.DisconnectedException;
> > @@ -24,7 +25,7 @@
> >
> > public final class CHKInsertSender implements Runnable, AnyInsertSender,
> ByteCounter {
> >
> > - private class BackgroundTransfer implements Runnable {
> > + private class BackgroundTransfer implements Runnable,
> AsyncMessageFilterCallback {
> > /** Node we are waiting for response from */
> > final PeerNode pn;
> > /** We may be sending data to that node */
> > @@ -42,6 +43,8 @@
> > /** Did it succeed? */
> > boolean transferSucceeded;
> >
> > + long transferCompletedTime;
> > +
> > BackgroundTransfer(PeerNode pn, PartiallyReceivedBlock prb) {
> > this.pn = pn;
> > bt = new BlockTransmitter(node.usm, pn, uid, prb,
> > node.outputThrottle,
> CHKInsertSender.this);
> > @@ -56,7 +59,13 @@
> > try {
> > bt.send(node.executor);
> >
> > this.completedTransfer(bt.failedDueToOverload());
> > -
> > this.receivedNotice(waitForReceivedNotification(this));
> > + if (pn.isRoutable() && transferSucceeded) {
> > + //synch-version:
> this.receivedNotice(waitForReceivedNotification(this));
> > + //Add ourselves as a listener for the
> > longterm completion message of
> this transfer, then gracefully exit.
> > +
> > node.usm.addAsyncFilter(getNotificationMessageFilter(), this);
> > + } else {
> > + this.receivedNotice(false);
> > + }
> > } catch (Throwable t) {
> > this.completedTransfer(false);
> > this.receivedNotice(false);
> > @@ -64,10 +73,11 @@
> > }
> > }
> >
> > - void completedTransfer(boolean success) {
> > + private void completedTransfer(boolean success) {
> > synchronized(this) {
> > transferSucceeded = success;
> > completedTransfer = true;
> > + transferCompletedTime =
> > System.currentTimeMillis();
> > notifyAll();
> > }
> > synchronized(backgroundTransfers) {
> > @@ -78,11 +88,15 @@
> > }
> > }
> >
> > - void receivedNotice(boolean success) {
> > + private void receivedNotice(boolean success) {
> > synchronized(this) {
> > + if (receivedCompletionNotice) {
> > + Logger.error(this,
> > "receivedNotice("+success+"), already had
> receivedNotice("+completionSucceeded+")");
> > + } else {
> > completionSucceeded = success;
> > receivedCompletionNotice = true;
> > notifyAll();
> > + }
> > }
> > synchronized(backgroundTransfers) {
> > backgroundTransfers.notifyAll();
> > @@ -92,6 +106,39 @@
> > }
> > }
> >
> > + public void onMatched(Message m) {
> > + PeerNode pn = (PeerNode) m.getSource();
> > + // pn cannot be null, because the filters will prevent
> > garbage
> collection of the nodes
> > +
> > + if(this.pn.equals(pn)) {
> > + boolean anyTimedOut =
> > m.getBoolean(DMT.ANY_TIMED_OUT);
> > + if(anyTimedOut) {
> > +
> > CHKInsertSender.this.setTransferTimedOut();
> > + }
> > + receivedNotice(!anyTimedOut);
> > + } else {
> > + Logger.error(this, "received completion notice
> > for wrong
> node: "+pn+" != "+this.pn);
> > + }
> > + }
> > +
> > + public boolean shouldTimeout() {
> > + //AFIACS, this will still let the filter timeout, but
> > not call
> onMatched() twice.
> > + return receivedCompletionNotice;
> > + }
> > +
> > + private MessageFilter getNotificationMessageFilter() {
> > + return MessageFilter.create().setField(DMT.UID,
>
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(pn).setTimeout(TRANSFER_COMPLETION_ACK_TIMEOUT);
> > + }
> > +
> > + boolean isTimedOut() {
> > + return
>
System.currentTimeMillis()>(transferCompletedTime+TRANSFER_COMPLETION_ACK_TIMEOUT);
> > + }
> > +
> > + public void maybeTimedOut() {
> > + if (isTimedOut()) {
> > + receivedNotice(false);
> > + }
> > + }
> > }
> >
> > CHKInsertSender(NodeCHK myKey, long uid, byte[] headers, short htl,
> > @@ -651,83 +698,7 @@
> > }
> > }
> > }
> > -
> > - /**
> > - * Blocks and waits for a response from the given node asto the final
> transfer status in the chain. This will be longer/after
> > - * the local block transfer is complete, as it is neccesary to include
the
> rount-trip-time in the allTransfersComplete()
> > - * function.
> > - * Returns true if received a successful notification of the downstream
> reception, false in every other case
> > - * (e.g. timeout, cancel, receiveFailed, etc).
> > - */
> > - private boolean waitForReceivedNotification(BackgroundTransfer awc) {
> > -
> > - long transfersCompletedTime =
> > System.currentTimeMillis();
> > -
> > - // Wait for acknowledgements from each node, or
> > timeouts.
> > -
> > - while(true) {
> > -
> > - synchronized(backgroundTransfers) {
> > - if(receiveFailed) return false;
> > - }
> > - // First calculate the timeout
> > - int timeout;
> > - long now = System.currentTimeMillis();
> > - timeout = (int)Math.min(Integer.MAX_VALUE,
> > (transfersCompletedTime +
> TRANSFER_COMPLETION_ACK_TIMEOUT) - now);
> > - if(timeout <= 0) {
> > - Logger.error(this, "Timed out
> > waiting for transfers to complete
> on "+uid);
> > - setTransferTimedOut();
> > - return false;
> > - }
> > -
> > - // If disconnected, ignore.
> > - if(!awc.pn.isRoutable()) {
> > - Logger.normal(this,
> > "Disconnected: "+awc.pn+"
> in "+CHKInsertSender.this);
> > - return false;
> > - }
> > - // If transfer failed, probably won't
> > be acknowledged.
> > - if(!awc.transferSucceeded) {
> > - if (logMINOR)
> Logger.minor(this, "waitForReceivedNotification: !transferSucceeded ->
> false");
> > - return false;
> > - }
> > - // See if redundant.
> > - if(awc.receivedCompletionNotice) {
> > - return awc.completionSucceeded;
> > - }
> > -
> > - MessageFilter mf =
> > -
> > MessageFilter.create().setField(DMT.UID,
>
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
> >
> > - if(logMINOR) Logger.minor(this,
> > "Waiting for "+awc.pn.getPeer());
> > -
> > - Message m;
> > - try {
> > - m = node.usm.waitFor(mf,
> > CHKInsertSender.this);
> > - } catch (DisconnectedException e) {
> > - Logger.normal(this,
> > "Disconnected (on waitFor): "+awc.pn+"
> in "+this);
> > - return false;
> > - }
> > - if(m == null) {
> > - Logger.error(this, "Timed out
> > waiting for a final ack
> from: "+awc.pn);
> > - return false;
> > - } else {
> > - PeerNode pn = (PeerNode)
> > m.getSource();
> > - // pn cannot be null, because
> > the filters will prevent garbage
> collection of the nodes
> > -
> > - if(awc.pn.equals(pn)) {
> > - boolean
> > anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
> > - if(anyTimedOut)
> > {
> > -
> > setTransferTimedOut();
> > - }
> > - return
> > !anyTimedOut;
> > - } else {
> > -
> > Logger.error(this, "received completion notice for wrong
> node: "+awc);
> > - continue;
> > - }
> > - }
> > - }
> > - }
> > -
> > /**
> > * Block until all transfers have reached a final-terminal
> > state
> (success/failure). On success this means that a
> > * successful 'received-notification' has been received.
> > @@ -751,6 +722,7 @@
> > completedTransfers = false;
> > break;
> > }
> > + transfers[i].maybeTimedOut();
> > if
> > (!transfers[i].receivedCompletionNotice) {
> > //must wait
> > completedNotifications = false;
> >
> > _______________________________________________
> > cvs mailing list
> > cvs at freenetproject.org
> > http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
> >
> >
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
URL:
<https://emu.freenetproject.org/pipermail/devl/attachments/20071220/bb7e3a8e/attachment.pgp>