Update of /cvsroot/freenet/freenet/src/freenet/node/states/data In directory sc8-pr-cvs1:/tmp/cvs-serv28750/src/freenet/node/states/data
Modified Files: SendData.java Log Message: 6308: Fix bug in SendData w.r.t. padding. If this works, it should reduce connection churn amongst other things. Also reindent. Index: SendData.java =================================================================== RCS file: /cvsroot/freenet/freenet/src/freenet/node/states/data/SendData.java,v retrieving revision 1.33 retrieving revision 1.34 diff -u -w -r1.33 -r1.34 --- SendData.java 31 Oct 2003 19:35:40 -0000 1.33 +++ SendData.java 4 Nov 2003 19:08:36 -0000 1.34 @@ -18,7 +18,6 @@ import freenet.node.ds.KeyInputStream; import freenet.support.Logger; - /** * Sends data from the store. If the reading from the store fails, then * the sent data will be padded until the end of the next part, where @@ -35,7 +34,8 @@ private final TrailerWriter send; private final KeyInputStream in; - private final TrailerWriteCallbackMessage myTWCM; // Use a message because of blocking I/O from store + private final TrailerWriteCallbackMessage myTWCM; + // Use a message because of blocking I/O from store private boolean closedSend = false, closedIn = false; private final long length, partSize; private Exception abortedException = null; @@ -54,29 +54,52 @@ int m = 0; private boolean hadDSI = false; private boolean waitingForWriteNotify = false; + private boolean lastNonPaddingChunk = false; - - public SendData(long id, long parent, TrailerWriter send, - KeyInputStream in, long length, long partSize, + public SendData( + long id, + long parent, + TrailerWriter send, + KeyInputStream in, + long length, + long partSize, Node n) { super(id, parent); this.send = send; this.in = in; - if(in == null) throw new IllegalArgumentException("null in"); + if (in == null) + throw new IllegalArgumentException("null in"); this.length = length; this.partSize = partSize; this.n = n; myTWCM = new TrailerWriteCallbackMessage(id, n, this); logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this); if(logDEBUG) - Core.logger.log(this, "Creating SendData("+this+")", Logger.DEBUG); + Core.logger.log( + this, + "Creating SendData(" + this +")", + Logger.DEBUG); } public String toString() { - return super.toString()+": send="+send+", in="+in+", moved="+moved+"/"+ - length+", partSize="+partSize+",result="+result+ - ",lastPacketLength="+lastPacketLength+ - (paddingLength != 0 ? (",sentPadding="+sentPadding+"/"+paddingLength) : ""); + return super.toString() + + ": send=" + + send + + ", in=" + + in + + ", moved=" + + moved + + "/" + + length + + ", partSize=" + + partSize + + ",result=" + + result + + ",lastPacketLength=" + + lastPacketLength + + (paddingLength != 0 + ? (",sentPadding=" + sentPadding + "/" + paddingLength) + : ""); } public final long length() { @@ -95,16 +118,21 @@ } /** If sending upstream, you want CB_ABORTED. - * If sending downstream, you want CB_RESTARTED. + * If sending downstream, you + * want CB_RESTARTED. */ public final void abort(int cb) { silent = true; result = cb; if(Core.logger.shouldLog(Logger.DEBUG,this)) { abortedException = new Exception("debug"); - Core.logger.log(this, "Aborted send for "+ - Long.toHexString(parent)+" with cb="+ - Integer.toHexString(cb), abortedException, + Core.logger.log( + this, + "Aborted send for " + + Long.toHexString(parent) + + " with cb=" + + Integer.toHexString(cb), + abortedException, Logger.DEBUG); } } @@ -115,41 +143,43 @@ public void finalize() { try { - if(!closedIn) in.close(); - } catch (IOException e) {}; - if(send != null && !closedSend) send.close(); + if (!closedIn) + in.close(); + } catch (IOException e) { + }; + if (send != null && !closedSend) + send.close(); } - /** Sheesh! We're too overworked to even try to write CB_ABORTED. + /** + * Sheesh! We're too overworked to even try to write CB_ABORTED. */ public final void lost(Node n) { try { in.close(); closedIn = true; } catch (IOException e) { - Core.logger.log(this, "I/O error closing KeyInputStream", - e, Logger.ERROR); + Core.logger.log( + this, + "I/O error closing KeyInputStream", + e, + Logger.ERROR); } - if(send != null) send.close(); + if (send != null) + send.close(); closedSend = true; } - - /* Async sending + /* + * Async sending * - * received(n, DataStateInitiator) -> - * Reset buffer pointer - * Read into buffer + * received(n, DataStateInitiator) -> Reset buffer pointer Read into buffer * Start write * - * closed() -> - * Set CB to CB_SEND_CONN_DIED - * Run the block that used to be finally in received(..) to finish properly + * closed() -> Set CB to CB_SEND_CONN_DIED Run the block that used to be + * finally in received(..) to finish properly * - * written() -> - * Reset buffer pointer - * Read into buffer - * Start write + * written() -> Reset buffer pointer Read into buffer Start write */ public State received(Node n, MessageObject mo) throws BadStateException { boolean isDSI = (mo instanceof DataStateInitiator); @@ -169,9 +199,19 @@ if (isTWCM) { waitingForWriteNotify = false; - if (logDEBUG) Core.logger.log(this, "Got " + myTWCM + " for " + this, Logger.DEBUG); + if (logDEBUG) + Core.logger.log( + this, + "Got " + myTWCM + " for " + this, + Logger.DEBUG); if (!myTWCM.finished) { - Core.logger.log(this, "Got a TWCM that was not finished!: " + myTWCM + " for " + this, Logger.ERROR); + Core.logger.log( + this, + "Got a TWCM that was not finished!: " + + myTWCM + + " for " + + this, + Logger.ERROR); return this; } if (!myTWCM.success) { @@ -179,10 +219,11 @@ if (handleThrowable(null, true) == null) return null; } else { - if (inPaddingMode) + if (inPaddingMode && !lastNonPaddingChunk) sentPadding += lastPacketLength; else moved += lastPacketLength; + lastNonPaddingChunk = false; } } @@ -195,7 +236,8 @@ if (bufferEndPtr == -1) throw new IOException("read failed: " + this); } catch (Throwable t) { - return handleThrowable(t, false); // will do termination and failure code + return handleThrowable(t, false); + // will do termination and failure code } } try { @@ -212,63 +254,123 @@ /** * Read bytes from the store into the buffer, starting at the beginning. * Return the number read. - * @throws IOException if something breaks + * + * @throws IOException + * if something breaks */ protected int doRead() throws IOException { - return in.read(buffer, 0, (int) Math.min(length - moved, buffer.length)); + return in.read( + buffer, + 0, + (int) Math.min(length - moved, buffer.length)); } /** * Start to write some bytes to the connection */ - protected void startWrite(int bytes) throws UnknownTrailerSendIDException, TrailerSendFinishedException, AlreadySendingTrailerChunkException, IOException { - if(bytes <= 0) return; + protected void startWrite(int bytes) + throws + UnknownTrailerSendIDException, + TrailerSendFinishedException, + AlreadySendingTrailerChunkException, + IOException { + if (bytes <= 0) + return; myTWCM.reset(); lastPacketLength = bytes; waitingForWriteNotify = true; - if(send != null) send.writeTrailing(buffer, 0, bytes, myTWCM); + if (send != null) + send.writeTrailing(buffer, 0, bytes, myTWCM); } /** - * Handle a throwable thrown during I/O. We are expected to close the connection, - * etc. - * @param t the throwable causing the failure, can be null. - * @param inWrite whether we were writing at the time. false means we were reading. + * Handle a throwable thrown during I/O. We are expected to close the + * connection, etc. + * + * @param t + * the throwable causing the failure, can be null. + * @param inWrite + * whether we were writing at the time. false means we were + * reading. */ protected State handleThrowable(Throwable t, boolean inWrite) { if(logDEBUG) { if(t == null) - Core.logger.log(this, "SendData.handleThrowable(null,"+inWrite+ - ") on "+this, Logger.DEBUG); + Core.logger.log( + this, + "SendData.handleThrowable(null," + inWrite + ") on " + this, + Logger.DEBUG); else - Core.logger.log(this, "SendData.handleThrowable("+t+","+inWrite+ - ") on "+this, t, Logger.DEBUG); + Core.logger.log( + this, + "SendData.handleThrowable(" + + t + + "," + + inWrite + + ") on " + + this, + t, + Logger.DEBUG); } if(t == null) { if(result == -1) - Core.logger.log(this, "handleThrowable caller must set result if passing null Throwable!", new Exception("grrr"), Logger.ERROR); + Core.logger.log( + this, + "handleThrowable caller must set result if passing null Throwable!", + new Exception("grrr"), + Logger.ERROR); } else if(t instanceof IOException) { - if(inWrite) result = Presentation.CB_SEND_CONN_DIED; + if (inWrite) + result = Presentation.CB_SEND_CONN_DIED; else { int ifc = in.getFailureCode(); if(ifc == -1) { - if(logDEBUG) Core.logger.log(this, "Cache failed between writing "+ - "and reading for "+Long.toHexString(id)+": "+ - t, t, Logger.DEBUG); + if (logDEBUG) + Core.logger.log( + this, + "Cache failed between writing " + + "and reading for " + + Long.toHexString(id) + + ": " + + t, + t, + Logger.DEBUG); result = Presentation.CB_CACHE_FAILED; - } else result = ifc; + } else + result = ifc; if (result == Presentation.CB_CACHE_FAILED) { - Core.logger.log(this, "Cache failed signalled after exception " + - "after " + moved + " of " + length - + " bytes: "+t+" for "+Long.toHexString(id)+ - " ("+Long.toHexString(parent)+".", t, + Core.logger.log( + this, + "Cache failed signalled after exception " + + "after " + + moved + + " of " + + length + + " bytes: " + + t + + " for " + + Long.toHexString(id) + + " (" + + Long.toHexString(parent) + + ".", + t, Logger.ERROR); } } } else { - Core.logger.log(this, "Unexpected exception "+t+" in SendData "+ - this+" (inWrite="+inWrite+")", t, Logger.ERROR); - result = Presentation.CB_CACHE_FAILED; // well, sorta + Core.logger.log( + this, + "Unexpected exception " + + t + + " in SendData " + + this + + " (inWrite=" + + inWrite + + ")", + t, + Logger.ERROR); + result = Presentation.CB_CACHE_FAILED; // well, + // sorta } Core.diagnostics.occurrenceBinomial("sentData", 1, 0); @@ -276,100 +378,180 @@ in.close(); closedIn = true; } catch (IOException e) { - Core.logger.log(this, "I/O error closing KeyInputStream", - e, Logger.ERROR); + Core.logger.log( + this, + "I/O error closing KeyInputStream", + e, + Logger.ERROR); } - Core.logger.log(this, "Send failed for "+Long.toHexString(id)+ - " ("+Long.toHexString(parent)+" - result="+ - Long.toHexString(result)+", cause: "+t, Logger.MINOR); + Core.logger.log( + this, + "Send failed for " + + Long.toHexString(id) + + " (" + + Long.toHexString(parent) + + " - result=" + + Long.toHexString(result) + + ", cause: " + + t, + Logger.MINOR); if(inWrite) { - if(send != null) send.close(); + if (send != null) + send.close(); closedSend = true; } else if(!inPaddingMode) { if(moved == length) { if(result == -1) - Core.logger.log(this, "WTF? moved = length in handleThrowable "+ - "for "+this, new Exception("debug"), + Core.logger.log( + this, + "WTF? moved = length in handleThrowable " + + "for " + + this, + new Exception("debug"), Logger.NORMAL); } else { try { startWritePadding(); return this; - // don't send the DataSent until the padding has finished writing + // don't send the DataSent until the padding has finished + // writing } catch (IOException e) { // Failed t = e; } catch (TrailerException e) { - Core.logger.log(this, "Got "+e+" starting writing padding for "+this, - e, Logger.ERROR); + Core.logger.log( + this, + "Got " + e + " starting writing padding for " + this, + e, + Logger.ERROR); } } // Failed or already at end - if(send != null) send.close(); + if (send != null) + send.close(); closedSend = true; } else { // Padding failed - if(send != null) send.close(); + if (send != null) + send.close(); closedSend = true; } buffer = null; // early GC - if(!silent) n.schedule(new DataSent(this)); + if (!silent) + n.schedule(new DataSent(this)); return null; } - protected void startWritePadding() throws UnknownTrailerSendIDException, TrailerSendFinishedException, AlreadySendingTrailerChunkException, IOException { + protected void startWritePadding() + throws + UnknownTrailerSendIDException, + TrailerSendFinishedException, + AlreadySendingTrailerChunkException, + IOException { // Pad until end of part inPaddingMode = true; int controlLength = Key.getControlLength(); long tmpLen = partSize + controlLength; - paddingLength = Math.min(tmpLen - moved % tmpLen, - length - moved); + paddingLength = Math.min(tmpLen - moved % tmpLen, length - moved); // Either way, it includes the padding byte - and the part hash - if(!waitingForWriteNotify) + if (!waitingForWriteNotify) { + Core.logger.log( + this, + "Writing first padding chunk for " + this, + Logger.DEBUG); sendWritePadding(); + } else { + Core.logger.log(this, + "Deferring first padding chunk for " + this, + Logger.DEBUG); + lastNonPaddingChunk = true; + } } - protected void sendWritePadding() throws UnknownTrailerSendIDException, TrailerSendFinishedException, AlreadySendingTrailerChunkException, IOException { + protected void sendWritePadding() + throws + UnknownTrailerSendIDException, + TrailerSendFinishedException, + AlreadySendingTrailerChunkException, + IOException { byte[] stuffToSend; long remainingPadding = paddingLength - sentPadding; if(logDEBUG) - Core.logger.log(this, "sendWritePadding(): paddingLength="+ - paddingLength+"/"+sentPadding+" ("+this+")", + Core.logger.log( + this, + "sendWritePadding(): paddingLength=" + + paddingLength + + "/" + + sentPadding + + " (" + + this + + ")", Logger.DEBUG); - if(remainingPadding <= 0) return; // we will get finished - if(remainingPadding < (Core.blockSize/*Key.getControlLength()*/)) { + if (remainingPadding <= 0) + return; // we will get finished + if (remainingPadding < (Core.blockSize /* Key.getControlLength() */ + )) { // Last chunk, yay stuffToSend = new byte[(int)remainingPadding]; Random r = new Random(Core.getRandSource().nextLong()); - r.nextBytes(stuffToSend); // is this necessary? it used to be 0 padded + r.nextBytes(stuffToSend); + // is this necessary? it used to be 0 padded // FIXME: assumes getControlLength() == 1 - stuffToSend[stuffToSend.length-1] = (byte) - ((result == Presentation.CB_ABORTED) ? - Presentation.CB_ABORTED : Presentation.CB_RESTARTED); + stuffToSend[stuffToSend.length - 1] = + (byte) ((result == Presentation.CB_ABORTED) + ? Presentation.CB_ABORTED + : Presentation.CB_RESTARTED); lastPacketLength = stuffToSend.length; } else { // Just another chunk stuffToSend = new byte[Core.blockSize]; Random r = new Random(Core.getRandSource().nextLong()); - r.nextBytes(stuffToSend); // is this necessary? it used to be 0 padded + r.nextBytes(stuffToSend); + // is this necessary? it used to be 0 padded lastPacketLength = Core.blockSize; } myTWCM.reset(); waitingForWriteNotify = true; - if(send != null) send.writeTrailing(stuffToSend, 0, - stuffToSend.length, myTWCM); + if (send != null) + send.writeTrailing(stuffToSend, 0, stuffToSend.length, myTWCM); } protected State finish() { - if (!(moved == length || (inPaddingMode && sentPadding >= paddingLength) || (result != -1 && !inPaddingMode))) { - Core.logger.log(this, "Not finishing because moved=" + moved + "/" + length + ", inPaddingMode=" + inPaddingMode + ", sentPadding=" + sentPadding + "/" + paddingLength + " (" + this +")", Logger.DEBUG); + if (!(moved == length + || (inPaddingMode && sentPadding >= paddingLength) + || (result != -1 && !inPaddingMode))) { + Core.logger.log( + this, + "Not finishing because moved=" + + moved + + "/" + + length + + ", inPaddingMode=" + + inPaddingMode + + ", sentPadding=" + + sentPadding + + "/" + + paddingLength + + " (" + + this + + ")", + Logger.DEBUG); return this; } if (inPaddingMode && sentPadding > paddingLength) - Core.logger.log(this, "sentPadding=" + sentPadding + "/" + paddingLength + " (" + this +")", Logger.NORMAL); + Core.logger.log( + this, + "sentPadding=" + + sentPadding + + "/" + + paddingLength + + " (" + + this + + ")", + Logger.NORMAL); if (result != -1) { // We were aborted return handleThrowable(null, false); @@ -381,135 +563,19 @@ in.close(); closedIn = true; } catch (IOException e) { - Core.logger.log(this, "Caught " + e + " closing input (successful): " + this, Logger.NORMAL); + Core.logger.log( + this, + "Caught " + e + " closing input (successful): " + this, + Logger.NORMAL); } if (moved == length) result = Presentation.CB_OK; - Core.diagnostics.occurrenceBinomial("sentData", 1, result == Presentation.CB_OK ? 1 : 0); + Core.diagnostics.occurrenceBinomial( + "sentData", + 1, + result == Presentation.CB_OK ? 1 : 0); if (!silent) n.schedule(new DataSent(this)); return null; }; - -// try { -// while (moved < length) { -// inWrite = false; -// if (result != -1) throw new CancelledIOException(); -// int m = in.read(buffer, 0, (int) Math.min(length - moved, buffer.length)); -// if (m == -1) { -// throw new IOException("Stopped short of full transfer"); -// } -// inWrite = true; -// if(logDEBUG) -// Core.logger.log(this, "Read "+(moved+m)+" of "+length+ -// " bytes for "+Long.toHexString(parent)+" ("+in+")", -// Logger.DEBUG); -// send.write(buffer, 0, m); -// moved += m; -// if(logDEBUG) -// Core.logger.log(this, "Moved "+moved+" of "+length+" bytes for "+ -// Long.toHexString(parent)+" ("+in+")", -// Logger.DEBUG); -// } -// send.close(); -// closedSend = true; -// result = Presentation.CB_OK; -// } catch (CancelledIOException e) { -// if(logDEBUG) -// Core.logger.log(this, "Cancelled IO: "+abortedException+" for "+ -// Long.toHexString(parent), abortedException, -// Logger.DEBUG); -// } catch (IOException e) { -// // if it was aborted we can expect the aborter to set the failure -// // code. -// result = (inWrite ? Presentation.CB_SEND_CONN_DIED -// : in.getFailureCode()); -// if (result == -1) // it broke some time between writing and reading -// { -// if(logDEBUG) Core.logger.log(this, "Cache failed between writing "+ -// "and reading for "+Long.toHexString(id), -// e, Logger.DEBUG); -// result = Presentation.CB_CACHE_FAILED; -// } -// if (result == Presentation.CB_CACHE_FAILED) { -// Core.logger.log(this, -// "Cache failed signalled after exception " + -// "after " + moved + " of " + length -// + " bytes: "+e+" for "+Long.toHexString(id)+ -// " ("+Long.toHexString(parent)+".", e , -// Logger.ERROR); -// } -// } finally { - -// n.diagnostics.occurrenceBinomial("sentData", 1, -// result == Presentation.CB_OK ? -// 1 : 0); - -// if(result != Presentation.CB_OK) { -// if(abortedException != null) { -// Core.logger.log(this, "Send aborted for "+Long.toHexString(id)+ -// " ("+Long.toHexString(parent)+" - result="+ -// Long.toHexString(result), -// abortedException, Logger.MINOR); -// abortedException = null; -// } else { -// Core.logger.log(this, "Send failed for "+Long.toHexString(id)+ -// " ("+Long.toHexString(parent)+" - result="+ -// Long.toHexString(result), Logger.MINOR); -// } -// } - -// buffer = null; - -// try { -// in.close(); -// closedIn = true; -// } catch (IOException e) { -// Core.logger.log(this, "I/O error closing KeyInputStream", -// e, Logger.ERROR); -// } - -// if (moved < length && !inWrite) { -// try { -// // pad until end of part -// long tmpLen = partSize + Key.getControlLength(); -// tmpLen = Math.min(tmpLen - moved % tmpLen, length - moved) - 1; - -// byte[] zeroes = new byte[Core.blockSize]; -// while (tmpLen > 0) { -// int m = (int) Math.min(tmpLen, zeroes.length); -// send.write(zeroes, 0, m); -// tmpLen -= m; -// } -// send.write(result == Presentation.CB_ABORTED ? Presentation.CB_ABORTED -// : Presentation.CB_RESTARTED); -// } catch (IOException e) { -// // this is important b/c it lets the parent chain know that -// // it shouldn't try to get back in touch with the upstream -// result = Presentation.CB_SEND_CONN_DIED; -// } -// } - -// if (result != Presentation.CB_OK) { -// try { -// send.close(); -// closedSend = true; -// } -// catch (IOException e) { -// Core.logger.log(this, "I/O error closing data send stream", -// e, Logger.MINOR); -// } -// } - -// // we could be exiting with an uncaught exception or something.. -// if (result == -1) result = Presentation.CB_SEND_CONN_DIED; - -// // had to wait around to see if we'd get a CB_SEND_CONN_DIED -// // when padding the write -// if (!silent) n.schedule(new DataSent(this)); -// } - -// return null; -// } } - _______________________________________________ cvs mailing list [EMAIL PROTECTED] http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs