Nice. Are you fairly sure this works? Have you done at least some testing on
it? I'm of half a mind to say revert it until after alpha 2 (this week) in
the interests of stability, you see...
On Monday 21 January 2008 19:17, robert at freenetproject.org wrote:
> Author: robert
> Date: 2008-01-21 19:17:14 +0000 (Mon, 21 Jan 2008)
> New Revision: 17190
>
> Modified:
> trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
> trunk/freenet/src/freenet/node/RequestHandler.java
> trunk/freenet/src/freenet/node/RequestSender.java
> Log:
> use callback for requestSender status; now only 1 thread per-request (rather
than 2)
>
>
> Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
> ===================================================================
> --- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-01-21
19:10:35 UTC (rev 17189)
> +++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java 2008-01-21
19:17:14 UTC (rev 17190)
> @@ -63,6 +63,8 @@
> final DoubleTokenBucket _masterThrottle;
> final ByteCounter _ctr;
> final int PACKET_SIZE;
> + private boolean asyncExitStatus;
> + private boolean asyncExitStatusSet;
>
> public BlockTransmitter(MessageCore usm, PeerContext destination, long
uid, PartiallyReceivedBlock source, DoubleTokenBucket masterThrottle,
ByteCounter ctr) {
> _usm = usm;
> @@ -289,7 +291,16 @@
> */
> public void sendAsync(final Executor executor) {
> executor.execute(new Runnable() {
> - public void run() { send(executor); } },
> + public void run() {
> + try {
> +
> asyncExitStatus=send(executor);
> + } finally {
> + synchronized
> (BlockTransmitter.this) {
> + asyncExitStatusSet=true;
> +
> BlockTransmitter.this.notifyAll();
> + }
> + }
> + } },
> "BlockTransmitter:sendAsync() for "+this);
> }
>
> @@ -304,6 +315,19 @@
> }
> }
> }
> +
> + public boolean getAsyncExitStatus() {
> + synchronized (this) {
> + while (!asyncExitStatusSet) {
> + try {
> + this.wait(10*1000);
> + } catch (InterruptedException e) {
> + //ignore
> + }
> + }
> + }
> + return asyncExitStatus;
> + }
>
> public PeerContext getDestination() {
> return _destination;
>
> Modified: trunk/freenet/src/freenet/node/RequestHandler.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/RequestHandler.java 2008-01-21
> 19:10:35
UTC (rev 17189)
> +++ trunk/freenet/src/freenet/node/RequestHandler.java 2008-01-21
> 19:17:14
UTC (rev 17190)
> @@ -28,7 +28,7 @@
> * is separated off into RequestSender so we get transfer coalescing
> * and both ends for free.
> */
> -public class RequestHandler implements Runnable, ByteCounter {
> +public class RequestHandler implements Runnable, ByteCounter,
RequestSender.Listener {
>
> private static boolean logMINOR;
> final Message req;
> @@ -45,6 +45,10 @@
> private RequestSender rs;
> private int status = RequestSender.NOT_FINISHED;
> private boolean appliedByteCounts=false;
> + private boolean sentRejectedOverload = false;
> + private long searchStartTime;
> + private long responseDeadline;
> + private BlockTransmitter bt;
>
> public String toString() {
> return super.toString()+" for "+uid;
> @@ -76,12 +80,14 @@
> freenet.support.Logger.OSThread.logPID(this);
> try {
> realRun();
> + //The last thing that realRun() does is register as a
> request-sender
listener, so any exception here is the end.
> } catch (NotConnectedException e) {
> - // Ignore, normal
> + Logger.normal(this, "requestor gone, could not start request
handler wait");
> + node.removeTransferringRequestHandler(uid);
> + node.unlockUID(uid, key instanceof NodeSSK, false, false);
> } catch (Throwable t) {
> Logger.error(this, "Caught "+t, t);
> - } finally {
> - node.removeTransferringRequestHandler(uid);
> + node.removeTransferringRequestHandler(uid);
> node.unlockUID(uid, key instanceof NodeSSK, false, false);
> }
> }
> @@ -150,26 +156,16 @@
> return;
> }
>
> - boolean shouldHaveStartedTransfer = false;
> - boolean sentRejectedOverload = false;
> + //If we cannot respond before this time, the 'source' node has
already fatally timed out (and we need not return packets which will not be
claimed)
> + searchStartTime = System.currentTimeMillis();
> + responseDeadline = searchStartTime +
> RequestSender.FETCH_TIMEOUT +
source.getProbableSendQueueTime();
> +
> + rs.addListener(this);
> + }
>
> - //If we cannot respond before this time, the 'source' node has
> already
fatally timed out (and we need not return packets which will not be claimed)
> - long searchStartTime = System.currentTimeMillis();
> - long responseDeadline = searchStartTime +
> RequestSender.FETCH_TIMEOUT +
source.getProbableSendQueueTime();
> - short waitStatus = 0;
> -
> - while(true) {
> -
> - waitStatus = rs.waitUntilStatusChange(waitStatus);
> - long now = System.currentTimeMillis();
> -
> - if (now > responseDeadline) {
> - Logger.error(this, "requestsender took too long
> to respond to requestor
("+TimeUtil.formatTime((now - searchStartTime), 2,
true)+"/"+rs.getStatus()+")");
> - applyByteCounts();
> - return;
> - }
> -
> - if((waitStatus & RequestSender.WAIT_REJECTED_OVERLOAD) != 0
&& !sentRejectedOverload) {
> + public void onReceivedRejectOverload() {
> + try {
> + if(!sentRejectedOverload) {
> // Forward RejectedOverload
> //Note: This message is only decernable from
> the terminal messages by
the IS_LOCAL flag being false. (!IS_LOCAL)->!Terminal
> Message msg = DMT.createFNPRejectedOverload(uid, false);
> @@ -177,17 +173,30 @@
> //If the status changes (e.g. to SUCCESS),
> there is little need to send
yet another reject overload.
> sentRejectedOverload=true;
> }
> -
> - if((waitStatus & RequestSender.WAIT_TRANSFERRING_DATA) != 0) {
> + } catch (NotConnectedException e) {
> + Logger.normal(this, "requestor is gone, can't forward
> reject overload");
> + }
> + }
> +
> + public void onCHKTransferBegins() {
> + try {
> // Is a CHK.
> Message df = DMT.createFNPCHKDataFound(uid,
rs.getHeaders());
> source.sendAsync(df, null, 0, this);
>
> PartiallyReceivedBlock prb = rs.getPRB();
> - BlockTransmitter bt =
> + bt =
> new BlockTransmitter(node.usm, source, uid, prb,
node.outputThrottle, this);
> node.addTransferringRequestHandler(uid);
> - if(bt.send(node.executor)) {
> + bt.sendAsync(node.executor);
> + } catch (NotConnectedException e) {
> + Logger.normal(this, "requestor is gone, can't begin CHK
> transfer");
> + }
> + }
> +
> + private void waitAndFinishCHKTransfer() throws NotConnectedException {
> + if (logMINOR) Logger.minor(this, "Waiting for CHK transfer to
> finish");
> + if(bt.getAsyncExitStatus()) {
> status = rs.getStatus();
> // Successful CHK transfer, maybe path fold
> finishOpennetChecked();
> @@ -196,14 +205,24 @@
> status = rs.getStatus();
> //for byte logging, since the block is
> the 'terminal' message.
> applyByteCounts();
> + unregisterRequestHandlerWithNode();
> }
> - return;
> - }
> + }
> +
> + public void onRequestSenderFinished(int status) {
> + long now = System.currentTimeMillis();
> +
> + if (now > responseDeadline) {
> + Logger.error(this, "requestsender took too long to
> respond to requestor
("+TimeUtil.formatTime((now - searchStartTime), 2,
true)+"/"+rs.getStatus()+")");
> + applyByteCounts();
> + unregisterRequestHandlerWithNode();
> + return;
> + }
> +
> + if(status == RequestSender.NOT_FINISHED)
> + Logger.error(this, "onFinished() but not finished?");
>
> - status = rs.getStatus();
> -
> - if(status == RequestSender.NOT_FINISHED) continue;
> -
> + try {
> switch(status) {
> case RequestSender.NOT_FINISHED:
> case RequestSender.DATA_NOT_FOUND:
> @@ -239,45 +258,56 @@
> } else {
> sendTerminal(df);
> }
> - return;
> } else {
> - if(!rs.transferStarted()) {
> + if(bt == null) {
> // Bug! This is impossible!
> Logger.error(this, "Status is SUCCESS
> but we never started
a transfer on "+uid);
> - // Could be a wierd synchronization
> bug, but we don't want
to wait forever, so treat it as overload.
> + // Obviously this node is confused,
> send a terminal reject
to make sure the requestor is not waiting forever.
> reject = DMT.createFNPRejectedOverload(uid, true);
> sendTerminal(reject);
> - return;
> } else {
> - // Race condition. We need to go around
> the loop again and
pick up the data transfer
> - // in waitStatus.
> + waitAndFinishCHKTransfer();
> }
> - // Either way, go back around the loop.
> - continue;
> }
> + return;
> case RequestSender.VERIFY_FAILURE:
> if(key instanceof NodeCHK) {
> - if(shouldHaveStartedTransfer)
> - throw new IllegalStateException("Got
> status code "+status+"
but transfer not started");
> - shouldHaveStartedTransfer = true;
> - continue; // should have started transfer
> + if(bt == null) {
> + // Bug! This is impossible!
> + Logger.error(this, "Status is
> VERIFY_FAILURE but we never
started a transfer on "+uid);
> + // Obviously this node
> is confused, send a terminal reject to make
sure the requestor is not waiting forever.
> + reject = DMT.createFNPRejectedOverload(uid, true);
> + sendTerminal(reject);
> + } else {
> + //Verify fails after
> receive() is complete, so we might as well
propagate it...
> + waitAndFinishCHKTransfer();
> + }
> + return;
> }
> reject = DMT.createFNPRejectedOverload(uid, true);
> sendTerminal(reject);
> return;
> case RequestSender.TRANSFER_FAILED:
> if(key instanceof NodeCHK) {
> - if(shouldHaveStartedTransfer)
> - throw new IllegalStateException("Got
> status code "+status+"
but transfer not started");
> - shouldHaveStartedTransfer = true;
> - continue; // should have started transfer
> + if(bt == null) {
> + // Bug! This is impossible!
> + Logger.error(this, "Status is
> TRANSFER_FAILED but we never
started a transfer on "+uid);
> + // Obviously this node
> is confused, send a terminal reject to make
sure the requestor is not waiting forever.
> + reject = DMT.createFNPRejectedOverload(uid, true);
> + sendTerminal(reject);
> + } else {
> + waitAndFinishCHKTransfer();
> + }
> + return;
> }
> - // Other side knows, right?
> + Logger.error(this, "finish(TRANSFER_FAILED) should not
> be
called on SSK?!?!");
> return;
> default:
> throw new IllegalStateException("Unknown status
code "+status);
> }
> - }
> + } catch (NotConnectedException e) {
> + Logger.normal(this, "requestor is gone, can't send
> terminal message");
> + }
> }
>
> /**
> @@ -313,10 +343,16 @@
> } else {
> //also for byte logging, since the block is the 'terminal'
message.
> applyByteCounts();
> + unregisterRequestHandlerWithNode();
> }
> }
> }
>
> + private void unregisterRequestHandlerWithNode() {
> + node.removeTransferringRequestHandler(uid);
> + node.unlockUID(uid, key instanceof NodeSSK, false, false);
> + }
> +
> /**
> * Sends the 'final' packet of a request in such a way that the thread
can be freed (made non-runnable/exit)
> * and the byte counter will still be accurate.
> @@ -352,6 +388,7 @@
> public void sent() {
> //For byte counting, this relies on the fact that the callback
will only be excuted once.
> applyByteCounts();
> + unregisterRequestHandlerWithNode();
> }
> }
>
> @@ -366,6 +403,7 @@
> (node.passOpennetRefsThroughDarknet() ||
> source.isOpennet()) &&
> finishOpennetInner(om)) {
> applyByteCounts();
> + unregisterRequestHandlerWithNode();
> return;
> }
>
> @@ -383,6 +421,7 @@
> if(om != null && (source.isOpennet() ||
node.passOpennetRefsThroughDarknet()) &&
> finishOpennetNoRelayInner(om)) {
> applyByteCounts();
> + unregisterRequestHandlerWithNode();
> return;
> }
>
>
> Modified: trunk/freenet/src/freenet/node/RequestSender.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/RequestSender.java 2008-01-21 19:10:35
UTC (rev 17189)
> +++ trunk/freenet/src/freenet/node/RequestSender.java 2008-01-21 19:17:14
UTC (rev 17190)
> @@ -3,7 +3,9 @@
> * http://www.gnu.org/ for further details of the GPL. */
> package freenet.node;
>
> +import java.util.ArrayList;
> import java.util.HashSet;
> +import java.util.Iterator;
>
> import freenet.crypt.CryptFormatException;
> import freenet.crypt.DSAPublicKey;
> @@ -70,6 +72,8 @@
> private byte[] sskData;
> private SSKBlock block;
> private boolean hasForwarded;
> +
> + private ArrayList listeners=new ArrayList();
>
> // Terminal status
> // Always set finished AFTER setting the reason flag
> @@ -433,7 +437,8 @@
> synchronized(this) {
> notifyAll();
> }
> -
> + fireCHKTransferBegins();
> +
> BlockReceiver br = new BlockReceiver(node.usm,
> next, uid,
prb, this);
>
> try {
> @@ -580,10 +585,13 @@
> private volatile boolean hasForwardedRejectedOverload;
>
> /** Forward RejectedOverload to the request originator */
> - private synchronized void forwardRejectedOverload() {
> - if(hasForwardedRejectedOverload) return;
> - hasForwardedRejectedOverload = true;
> - notifyAll();
> + private void forwardRejectedOverload() {
> + synchronized (this) {
> + if(hasForwardedRejectedOverload) return;
> + hasForwardedRejectedOverload = true;
> + notifyAll();
> + }
> + fireReceivedRejectOverload();
> }
>
> public PartiallyReceivedBlock getPRB() {
> @@ -659,20 +667,27 @@
> if(status == SUCCESS)
> successFrom = next;
> }
> -
> +
> if(status == SUCCESS) {
> if(next != null) {
> next.onSuccess(false, key instanceof NodeSSK);
> }
> node.nodeStats.requestCompleted(true, source != null, key
instanceof NodeSSK);
>
> + //NOTE: because of the requesthandler implementation,
> this will block
and wait
> + // for downstream transfers on a CHK. The opennet
> stuff introduces
> + // a delay of it's own if we don't get the
> expected message.
> + fireRequestSenderFinished(code);
> +
> if(key instanceof NodeCHK && next != null &&
> (next.isOpennet() ||
> node.passOpennetRefsThroughDarknet()) ) {
> finishOpennet(next);
> } else
> finishOpennetNull(next);
> - } else
> + } else {
> node.nodeStats.requestCompleted(false, source != null, key
instanceof NodeSSK);
> + fireRequestSenderFinished(code);
> + }
>
> synchronized(this) {
> opennetFinished = true;
> @@ -852,4 +867,72 @@
> public boolean isLocalRequestSearch() {
> return (source==null);
> }
> +
> + interface Listener {
> + void onReceivedRejectOverload();
> + void onCHKTransferBegins();
> + void onRequestSenderFinished(int status);
> + }
> +
> + public void addListener(Listener l) {
> + boolean reject=false;
> + boolean transfer=false;
> + int status;
> + synchronized (this) {
> + synchronized (listeners) {
> + listeners.add(l);
> + }
> + reject=hasForwardedRejectedOverload;
> + transfer=transferStarted();
> + status=this.status;
> + }
> + if (reject)
> + l.onReceivedRejectOverload();
> + if (transfer)
> + l.onCHKTransferBegins();
> + if (status!=NOT_FINISHED)
> + l.onRequestSenderFinished(status);
> + }
> +
> + private void fireReceivedRejectOverload() {
> + synchronized (listeners) {
> + Iterator i=listeners.iterator();
> + while (i.hasNext()) {
> + Listener l=(Listener)i.next();
> + try {
> + l.onReceivedRejectOverload();
> + } catch (Throwable t) {
> + Logger.error(this, "Caught: "+t, t);
> + }
> + }
> + }
> + }
> +
> + private void fireCHKTransferBegins() {
> + synchronized (listeners) {
> + Iterator i=listeners.iterator();
> + while (i.hasNext()) {
> + Listener l=(Listener)i.next();
> + try {
> + l.onCHKTransferBegins();
> + } catch (Throwable t) {
> + Logger.error(this, "Caught: "+t, t);
> + }
> + }
> + }
> + }
> +
> + private void fireRequestSenderFinished(int status) {
> + synchronized (listeners) {
> + Iterator i=listeners.iterator();
> + while (i.hasNext()) {
> + Listener l=(Listener)i.next();
> + try {
> + l.onRequestSenderFinished(status);
> + } catch (Throwable t) {
> + Logger.error(this, "Caught: "+t, t);
> + }
> + }
> + }
> + }
> }
>
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
>
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
URL:
<https://emu.freenetproject.org/pipermail/devl/attachments/20080122/db2fc4e6/attachment.pgp>