Nice. Are you fairly sure this works? Have you done at least some testing on 
it? I'm of half a mind to say revert it until after alpha 2 (this week) in 
the interests of stability, you see...

On Monday 21 January 2008 19:17, robert at freenetproject.org wrote:
> Author: robert
> Date: 2008-01-21 19:17:14 +0000 (Mon, 21 Jan 2008)
> New Revision: 17190
> 
> Modified:
>    trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
>    trunk/freenet/src/freenet/node/RequestHandler.java
>    trunk/freenet/src/freenet/node/RequestSender.java
> Log:
> use callback for requestSender status; now only 1 thread per-request (rather 
than 2)
> 
> 
> Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
> ===================================================================
> --- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java   2008-01-21 
19:10:35 UTC (rev 17189)
> +++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java   2008-01-21 
19:17:14 UTC (rev 17190)
> @@ -63,6 +63,8 @@
>       final DoubleTokenBucket _masterThrottle;
>       final ByteCounter _ctr;
>       final int PACKET_SIZE;
> +     private boolean asyncExitStatus;
> +     private boolean asyncExitStatusSet;
>       
>       public BlockTransmitter(MessageCore usm, PeerContext destination, long 
uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle, 
ByteCounter ctr) {
>               _usm = usm;
> @@ -289,7 +291,16 @@
>        */
>       public void sendAsync(final Executor executor) {
>               executor.execute(new Runnable() {
> -                     public void run() { send(executor); } },
> +                     public void run() {
> +                                              try {
> +                                                 
> asyncExitStatus=send(executor);
> +                                              } finally {
> +                                                 synchronized 
> (BlockTransmitter.this) {
> +                                                    asyncExitStatusSet=true;
> +                                                    
> BlockTransmitter.this.notifyAll();
> +                                                 }
> +                                              }
> +                                     } },
>                       "BlockTransmitter:sendAsync() for "+this);
>       }
>  
> @@ -304,6 +315,19 @@
>                       }
>               }
>       }
> +     
> +     public boolean getAsyncExitStatus() {
> +             synchronized (this) {
> +                     while (!asyncExitStatusSet) {
> +                             try {
> +                                     this.wait(10*1000);
> +                             } catch (InterruptedException e) {
> +                                     //ignore
> +                             }
> +                     }
> +             }
> +             return asyncExitStatus;
> +     }
>  
>       public PeerContext getDestination() {
>               return _destination;
> 
> Modified: trunk/freenet/src/freenet/node/RequestHandler.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/RequestHandler.java        2008-01-21 
> 19:10:35 
UTC (rev 17189)
> +++ trunk/freenet/src/freenet/node/RequestHandler.java        2008-01-21 
> 19:17:14 
UTC (rev 17190)
> @@ -28,7 +28,7 @@
>   * is separated off into RequestSender so we get transfer coalescing
>   * and both ends for free. 
>   */
> -public class RequestHandler implements Runnable, ByteCounter {
> +public class RequestHandler implements Runnable, ByteCounter, 
RequestSender.Listener {
>  
>       private static boolean logMINOR;
>      final Message req;
> @@ -45,6 +45,10 @@
>      private RequestSender rs;
>      private int status = RequestSender.NOT_FINISHED;
>       private boolean appliedByteCounts=false;
> +     private boolean sentRejectedOverload = false;
> +     private long searchStartTime;
> +     private long responseDeadline;
> +     private BlockTransmitter bt;    
>       
>      public String toString() {
>          return super.toString()+" for "+uid;
> @@ -76,12 +80,14 @@
>           freenet.support.Logger.OSThread.logPID(this);
>          try {
>               realRun();
> +                     //The last thing that realRun() does is register as a 
> request-sender 
listener, so any exception here is the end.
>          } catch (NotConnectedException e) {
> -             // Ignore, normal
> +             Logger.normal(this, "requestor gone, could not start request 
handler wait");
> +                     node.removeTransferringRequestHandler(uid);
> +            node.unlockUID(uid, key instanceof NodeSSK, false, false);
>          } catch (Throwable t) {
>              Logger.error(this, "Caught "+t, t);
> -        } finally {
> -             node.removeTransferringRequestHandler(uid);
> +                     node.removeTransferringRequestHandler(uid);
>              node.unlockUID(uid, key instanceof NodeSSK, false, false);
>          }
>      }
> @@ -150,26 +156,16 @@
>              return;
>          }
>          
> -        boolean shouldHaveStartedTransfer = false;
> -        boolean sentRejectedOverload = false;
> +        //If we cannot respond before this time, the 'source' node has 
already fatally timed out (and we need not return packets which will not be 
claimed)
> +             searchStartTime = System.currentTimeMillis();
> +             responseDeadline = searchStartTime + 
> RequestSender.FETCH_TIMEOUT + 
source.getProbableSendQueueTime();
> +        
> +        rs.addListener(this);
> +     }
>               
> -             //If we cannot respond before this time, the 'source' node has 
> already 
fatally timed out (and we need not return packets which will not be claimed)
> -             long searchStartTime = System.currentTimeMillis();
> -             long responseDeadline = searchStartTime + 
> RequestSender.FETCH_TIMEOUT + 
source.getProbableSendQueueTime();
> -        short waitStatus = 0;
> -        
> -        while(true) {
> -            
> -             waitStatus = rs.waitUntilStatusChange(waitStatus);
> -                     long now = System.currentTimeMillis();
> -                     
> -                     if (now > responseDeadline) {
> -                             Logger.error(this, "requestsender took too long 
> to respond to requestor 
("+TimeUtil.formatTime((now - searchStartTime), 2, 
true)+"/"+rs.getStatus()+")"); 
> -                             applyByteCounts();
> -                             return;
> -                     }
> -                     
> -            if((waitStatus & RequestSender.WAIT_REJECTED_OVERLOAD) != 0 
&& !sentRejectedOverload) {
> +     public void onReceivedRejectOverload() {
> +             try {
> +            if(!sentRejectedOverload) {
>               // Forward RejectedOverload
>                               //Note: This message is only decernable from 
> the terminal messages by 
the IS_LOCAL flag being false. (!IS_LOCAL)->!Terminal
>               Message msg = DMT.createFNPRejectedOverload(uid, false);
> @@ -177,17 +173,30 @@
>                               //If the status changes (e.g. to SUCCESS), 
> there is little need to send 
yet another reject overload.
>                               sentRejectedOverload=true;
>              }
> -            
> -            if((waitStatus & RequestSender.WAIT_TRANSFERRING_DATA) != 0) {
> +             } catch (NotConnectedException e) {
> +                     Logger.normal(this, "requestor is gone, can't forward 
> reject overload");
> +             }
> +     }
> +     
> +     public void onCHKTransferBegins() {
> +             try {
>               // Is a CHK.
>                  Message df = DMT.createFNPCHKDataFound(uid, 
rs.getHeaders());
>                  source.sendAsync(df, null, 0, this);
>                  
>                  PartiallyReceivedBlock prb = rs.getPRB();
> -             BlockTransmitter bt =
> +             bt =
>                   new BlockTransmitter(node.usm, source, uid, prb, 
node.outputThrottle, this);
>               node.addTransferringRequestHandler(uid);
> -             if(bt.send(node.executor)) {
> +                     bt.sendAsync(node.executor);
> +             } catch (NotConnectedException e) {
> +                     Logger.normal(this, "requestor is gone, can't begin CHK 
> transfer");
> +             }
> +     }
> +     
> +     private void waitAndFinishCHKTransfer() throws NotConnectedException {
> +             if (logMINOR) Logger.minor(this, "Waiting for CHK transfer to 
> finish");
> +             if(bt.getAsyncExitStatus()) {
>                                       status = rs.getStatus();
>                               // Successful CHK transfer, maybe path fold
>                               finishOpennetChecked();
> @@ -196,14 +205,24 @@
>                                       status = rs.getStatus();
>                                       //for byte logging, since the block is 
> the 'terminal' message.
>                                       applyByteCounts();
> +                                     unregisterRequestHandlerWithNode();
>                               }
> -                 return;
> -            }
> +     }
> +     
> +     public void onRequestSenderFinished(int status) {
> +             long now = System.currentTimeMillis();
> +             
> +             if (now > responseDeadline) {
> +                     Logger.error(this, "requestsender took too long to 
> respond to requestor 
("+TimeUtil.formatTime((now - searchStartTime), 2, 
true)+"/"+rs.getStatus()+")"); 
> +                     applyByteCounts();
> +                     unregisterRequestHandlerWithNode();
> +                     return;
> +             }
> +     
> +             if(status == RequestSender.NOT_FINISHED)
> +                     Logger.error(this, "onFinished() but not finished?");
>              
> -            status = rs.getStatus();
> -
> -            if(status == RequestSender.NOT_FINISHED) continue;
> -            
> +             try {
>              switch(status) {
>               case RequestSender.NOT_FINISHED:
>               case RequestSender.DATA_NOT_FOUND:
> @@ -239,45 +258,56 @@
>                               } else {
>                                       sendTerminal(df);
>                               }
> -                             return;
>                       } else {
> -                             if(!rs.transferStarted()) {
> +                             if(bt == null) {
>                                       // Bug! This is impossible!
>                                       Logger.error(this, "Status is SUCCESS 
> but we never started 
a transfer on "+uid);
> -                                     // Could be a wierd synchronization 
> bug, but we don't want 
to wait forever, so treat it as overload.
> +                                     // Obviously this node is confused, 
> send a terminal reject 
to make sure the requestor is not waiting forever.
>                           reject = DMT.createFNPRejectedOverload(uid, true);
>                               sendTerminal(reject);
> -                             return;
>                               } else {
> -                                     // Race condition. We need to go around 
> the loop again and 
pick up the data transfer
> -                                     // in waitStatus.
> +                                     waitAndFinishCHKTransfer();
>                               }
> -                             // Either way, go back around the loop.
> -                             continue;
>                       }
> +                                     return;
>               case RequestSender.VERIFY_FAILURE:
>                       if(key instanceof NodeCHK) {
> -                             if(shouldHaveStartedTransfer)
> -                                     throw new IllegalStateException("Got 
> status code "+status+" 
but transfer not started");
> -                             shouldHaveStartedTransfer = true;
> -                             continue; // should have started transfer
> +                                             if(bt == null) {
> +                                     // Bug! This is impossible!
> +                                     Logger.error(this, "Status is 
> VERIFY_FAILURE but we never 
started a transfer on "+uid);
> +                                                     // Obviously this node 
> is confused, send a terminal reject to make 
sure the requestor is not waiting forever.
> +                         reject = DMT.createFNPRejectedOverload(uid, true);
> +                             sendTerminal(reject);
> +                             } else {
> +                                                     //Verify fails after 
> receive() is complete, so we might as well 
propagate it...
> +                                     waitAndFinishCHKTransfer();
> +                             }
> +                                             return;
>                       }
>                   reject = DMT.createFNPRejectedOverload(uid, true);
>                       sendTerminal(reject);
>                       return;
>               case RequestSender.TRANSFER_FAILED:
>                       if(key instanceof NodeCHK) {
> -                             if(shouldHaveStartedTransfer)
> -                                     throw new IllegalStateException("Got 
> status code "+status+" 
but transfer not started");
> -                             shouldHaveStartedTransfer = true;
> -                             continue; // should have started transfer
> +                             if(bt == null) {
> +                                     // Bug! This is impossible!
> +                                     Logger.error(this, "Status is 
> TRANSFER_FAILED but we never 
started a transfer on "+uid);
> +                                                     // Obviously this node 
> is confused, send a terminal reject to make 
sure the requestor is not waiting forever.
> +                         reject = DMT.createFNPRejectedOverload(uid, true);
> +                             sendTerminal(reject);
> +                             } else {
> +                                     waitAndFinishCHKTransfer();
> +                             }
> +                                             return;
>                       }
> -                     // Other side knows, right?
> +                     Logger.error(this, "finish(TRANSFER_FAILED) should not 
> be 
called on SSK?!?!");
>                       return;
>               default:
>                   throw new IllegalStateException("Unknown status 
code "+status);
>              }
> -        }
> +             } catch (NotConnectedException e) {
> +                     Logger.normal(this, "requestor is gone, can't send 
> terminal message");
> +             }
>       }
>  
>      /**
> @@ -313,10 +343,16 @@
>                       } else {
>                  //also for byte logging, since the block is the 'terminal' 
message.
>                  applyByteCounts();
> +                             unregisterRequestHandlerWithNode();
>               }
>          }
>       }
>  
> +     private void unregisterRequestHandlerWithNode() {
> +             node.removeTransferringRequestHandler(uid);
> +             node.unlockUID(uid, key instanceof NodeSSK, false, false);
> +     }
> +     
>       /**
>       * Sends the 'final' packet of a request in such a way that the thread 
can be freed (made non-runnable/exit)
>       * and the byte counter will still be accurate.
> @@ -352,6 +388,7 @@
>               public void sent() {
>              //For byte counting, this relies on the fact that the callback 
will only be excuted once.
>                       applyByteCounts();
> +                     unregisterRequestHandlerWithNode();
>          }
>       }
>      
> @@ -366,6 +403,7 @@
>                               (node.passOpennetRefsThroughDarknet() || 
> source.isOpennet()) &&
>                  finishOpennetInner(om)) {
>                       applyByteCounts();
> +                     unregisterRequestHandlerWithNode();
>                       return;
>               }
>               
> @@ -383,6 +421,7 @@
>               if(om != null && (source.isOpennet() || 
node.passOpennetRefsThroughDarknet()) &&
>                  finishOpennetNoRelayInner(om)) {
>                       applyByteCounts();
> +                     unregisterRequestHandlerWithNode();
>                       return;
>               }
>               
> 
> Modified: trunk/freenet/src/freenet/node/RequestSender.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/RequestSender.java 2008-01-21 19:10:35 
UTC (rev 17189)
> +++ trunk/freenet/src/freenet/node/RequestSender.java 2008-01-21 19:17:14 
UTC (rev 17190)
> @@ -3,7 +3,9 @@
>   * http://www.gnu.org/ for further details of the GPL. */
>  package freenet.node;
>  
> +import java.util.ArrayList;
>  import java.util.HashSet;
> +import java.util.Iterator;
>  
>  import freenet.crypt.CryptFormatException;
>  import freenet.crypt.DSAPublicKey;
> @@ -70,6 +72,8 @@
>      private byte[] sskData;
>      private SSKBlock block;
>      private boolean hasForwarded;
> +     
> +     private ArrayList listeners=new ArrayList();
>      
>      // Terminal status
>      // Always set finished AFTER setting the reason flag
> @@ -433,7 +437,8 @@
>                               synchronized(this) {
>                                       notifyAll();
>                               }
> -                             
> +                             fireCHKTransferBegins();
> +                                             
>                               BlockReceiver br = new BlockReceiver(node.usm, 
> next, uid, 
prb, this);
>                               
>                               try {
> @@ -580,10 +585,13 @@
>       private volatile boolean hasForwardedRejectedOverload;
>      
>      /** Forward RejectedOverload to the request originator */
> -    private synchronized void forwardRejectedOverload() {
> -     if(hasForwardedRejectedOverload) return;
> -     hasForwardedRejectedOverload = true;
> -             notifyAll();
> +    private void forwardRejectedOverload() {
> +             synchronized (this) {
> +                     if(hasForwardedRejectedOverload) return;
> +                     hasForwardedRejectedOverload = true;
> +                     notifyAll();
> +             }
> +             fireReceivedRejectOverload();
>       }
>      
>      public PartiallyReceivedBlock getPRB() {
> @@ -659,20 +667,27 @@
>              if(status == SUCCESS)
>               successFrom = next;
>          }
> -        
> +             
>          if(status == SUCCESS) {
>               if(next != null) {
>                       next.onSuccess(false, key instanceof NodeSSK);
>               }
>               node.nodeStats.requestCompleted(true, source != null, key 
instanceof NodeSSK);
>               
> +                     //NOTE: because of the requesthandler implementation, 
> this will block 
and wait
> +                     //      for downstream transfers on a CHK. The opennet 
> stuff introduces
> +                     //      a delay of it's own if we don't get the 
> expected message.
> +                     fireRequestSenderFinished(code);
> +                     
>               if(key instanceof NodeCHK && next != null && 
>                               (next.isOpennet() || 
> node.passOpennetRefsThroughDarknet()) ) {
>                       finishOpennet(next);
>               } else
>                       finishOpennetNull(next);
> -        } else
> +        } else {
>               node.nodeStats.requestCompleted(false, source != null, key 
instanceof NodeSSK);
> +                     fireRequestSenderFinished(code);
> +             }
>          
>               synchronized(this) {
>                       opennetFinished = true;
> @@ -852,4 +867,72 @@
>       public boolean isLocalRequestSearch() {
>               return (source==null);
>       }
> +     
> +     interface Listener {
> +             void onReceivedRejectOverload();
> +             void onCHKTransferBegins();
> +             void onRequestSenderFinished(int status);
> +     }
> +     
> +     public void addListener(Listener l) {
> +             boolean reject=false;
> +             boolean transfer=false;
> +             int status;
> +             synchronized (this) {
> +                     synchronized (listeners) {
> +                             listeners.add(l);
> +                     }
> +                     reject=hasForwardedRejectedOverload;
> +                     transfer=transferStarted();
> +                     status=this.status;
> +             }
> +             if (reject)
> +                     l.onReceivedRejectOverload();
> +             if (transfer)
> +                     l.onCHKTransferBegins();
> +             if (status!=NOT_FINISHED)
> +                     l.onRequestSenderFinished(status);
> +     }
> +     
> +     private void fireReceivedRejectOverload() {
> +             synchronized (listeners) {
> +                     Iterator i=listeners.iterator();
> +                     while (i.hasNext()) {
> +                             Listener l=(Listener)i.next();
> +                             try {
> +                                     l.onReceivedRejectOverload();
> +                             } catch (Throwable t) {
> +                                     Logger.error(this, "Caught: "+t, t);
> +                             }
> +                     }
> +             }
> +     }
> +     
> +     private void fireCHKTransferBegins() {
> +             synchronized (listeners) {
> +                     Iterator i=listeners.iterator();
> +                     while (i.hasNext()) {
> +                             Listener l=(Listener)i.next();
> +                             try {
> +                                     l.onCHKTransferBegins();
> +                             } catch (Throwable t) {
> +                                     Logger.error(this, "Caught: "+t, t);
> +                             }
> +                     }
> +             }
> +     }
> +     
> +     private void fireRequestSenderFinished(int status) {
> +             synchronized (listeners) {
> +                     Iterator i=listeners.iterator();
> +                     while (i.hasNext()) {
> +                             Listener l=(Listener)i.next();
> +                             try {
> +                                     l.onRequestSenderFinished(status);
> +                             } catch (Throwable t) {
> +                                     Logger.error(this, "Caught: "+t, t);
> +                             }
> +                     }
> +             }
> +     }
>  }
> 
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
> 
> 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
URL: 
<https://emu.freenetproject.org/pipermail/devl/attachments/20080122/db2fc4e6/attachment.pgp>

Reply via email to