Update of /cvsroot/freenet/freenet/src/freenet In directory sc8-pr-cvs1:/tmp/cvs-serv306/src/freenet
Modified Files: ConnectionHandler.java Log Message: Major refactoring. Moved all variables handling trailer sending state into a separate inner class and added some extra state safeguards here and there (hopefully none to paranoid though). Review would be much appreciated. Index: ConnectionHandler.java =================================================================== RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandler.java,v retrieving revision 1.213 retrieving revision 1.214 diff -u -w -r1.213 -r1.214 --- ConnectionHandler.java 3 Dec 2003 15:41:16 -0000 1.213 +++ ConnectionHandler.java 4 Dec 2003 15:27:41 -0000 1.214 @@ -159,15 +159,169 @@ } private DataTransferAccounter transferAccounter = new DataTransferAccounter(); // private volatile SendOutputStream currentSOS = null; - private static Object trailerSendIDCounterLock = new Object(); - private static int trailerSendIDCounter = 0; - private volatile int trailerSendID = -1; - private volatile long trailerSendLength = 0; - private volatile int trailerSentBytes = 0; - private volatile boolean isSendingTrailerChunk = false; - private final Object trailerSendLock = new Object(); - private int sendingTrailerChunkBytes = 0; - private TrailerWriteCallback twcb; + + + private static class TrailerSendState { + private volatile int trailerSendID = -1; //The ID of the trailer-send currently in progress + private volatile long trailerSendLength = 0; //The complete length of the trailer currently in progress (how many trailerbytes will we send) + private volatile int trailerSentBytes = 0; //How many bytes of the trailer have we currently sent + private volatile boolean isSendingTrailerChunk = false; //Are we currently sending a chunk of the trailer //TODO: Replace with trailerChunkBytes>0 ? + private int trailerChunkBytes = 0; //The number of bytes in the currently sending trailer chunk + private TrailerWriteCallback twcb; //Who should we notify when the current chunk has been sent + private ConnectionHandler ch; //TODO: For logging purposes only, remove when appropriate + private static TrailerSendIDSource trailerSendIDSource = new TrailerSendIDSource(); //Used to deliver unique ID:s to each new trailersend + + private static class TrailerSendIDSource + { + private int nextID = 0; + synchronized int getNext(){ + nextID++; + if(nextID <0) //Handle possible wraparound + nextID = 0; + return nextID; + } + } + + public class PartialChunkSentException extends Exception + { + PartialChunkSentException(String s){ + super(s); + } + } + //TODO: Ugly hack, remove as soon as appropriate + void setCH(ConnectionHandler ch) { + this.ch = ch; + } + + //returns true if we currently is sending a trailer (assuming that everyone has notified this context correctly) + public boolean isSendingTrailer() { + return trailerSendID != -1; + } + public boolean isSendingTrailerChunk(){ + return isSendingTrailerChunk; + } + //returns the number of trailer bytes ready to send reasonably quickly + public synchronized long trailerLengthAvailable() { + if (isSendingTrailer()) { + if (twcb == null) //TODO: Can this happen, maybe a IllegalStateException should be thrown? + return trailerSendLength - trailerSentBytes; + else + return twcb.bytesAvailable(); + } else //TODO: should this happen, maybe a IllegalStateException should be thrown? + return 0; + } + + //returns true if not all of the trailer is sent yet + public boolean hasTrailerDataRemaining() { + if (!isSendingTrailer()) + throw new IllegalStateException("Not sending a trailer"); + return trailerSentBytes < trailerSendLength; + } + + public void reset() { + TrailerWriteCallback temp; + synchronized (this) { + isSendingTrailerChunk = false; + trailerChunkBytes = 0; + trailerSendLength = 0; + trailerSentBytes = 0; + trailerSendID = -1; + temp = twcb; + twcb = null; + } + if (temp != null) + temp.closed(); //Should not be called synchronized on this?! + } + //Initializes this context for sending a new trailer of length 'length' + //returns the SendID + public synchronized int startNew(long length) { + if (isSendingTrailer()) + throw new IllegalStateException("Already sending a trailer"); + if (isSendingTrailerChunk()) + throw new IllegalStateException("Already sending a trailer chunk"); //TODO: This check might be unnecesary + if(twcb != null) + throw new IllegalStateException("Has a twcb"); //TODO: This check might be unnecesary + reset(); //This might also be an unnecesary call.. + trailerSendID = trailerSendIDSource.getNext(); + trailerSendLength = length; + return trailerSendID; + } + //Called internally whenever a chunk has been completely written, should never be called by 'NOT this' + //Notifies the registered callback that it has happened + private void registerChunkSendCompleted() { + if (ch.logDEBUG) + ch.logDEBUG("Complete trailer chunk sent"); + synchronized (this) { + trailerSentBytes += trailerChunkBytes; + isSendingTrailerChunk = false; + trailerChunkBytes = 0; + } + if (twcb != null) { + if (ch.logDEBUG) + ch.logDEBUG("Calling " + twcb + ".written()"); + twcb.written(); //Should not be called synchronized on this?! + //TODO: Forget the twcb here maybe? + } + } + //Registers a close of the current trailersend with this context + //The parameter 'expectedTrailerSendID' is used only for a sanity check.. it ought to match our internal id unless someone has done something bad + public synchronized void closeTrailerSend(int expectedTrailerSendID) { + if (ch.logDEBUG) + ch.logDEBUG("closeTrailer(" + expectedTrailerSendID + "): " + twcb, true); + if (!isSendingTrailer()) + throw new IllegalStateException("Not sending a trailer"); + if (isSendingTrailerChunk()) + Core.logger.log(this, "Closing trailer " + expectedTrailerSendID + " on " + this +" while still sending trailer chunk!", new Exception("debug"), Logger.ERROR); + if (trailerSendID == expectedTrailerSendID) { + if (hasTrailerDataRemaining()) + Core.logger.log(this, "Called closeTrailer, when only sent " + trailerSentBytes + " of " + trailerSendLength + " (" + this +")", new Exception("hrrm"), Logger.MINOR); + reset(); + }else + throw new IllegalStateException("Asked to close trailersend #"+expectedTrailerSendID+" when actually sending #"+trailerSendID); + // else + // if (isSendingTrailer()) + // Core.logger.log(this, "Closing trailer " + expectedTrailerSendID + " when handling " + trailerSendID + "! (" + this +")", new Exception("hrrm"), Logger.ERROR); + } + //registers a start of send of a new chunk of the trailer + //The parameter 'expectedTrailerSendID' is used only for a sanity check.. it ought to match our internal id unless someone has done something bad + public void registerStartChunkWrite(int expectedTrailerSendID, int offset, int length, TrailerWriteCallback cb) throws UnknownTrailerSendIDException, AlreadySendingTrailerChunkException, TrailerSendFinishedException { + if (ch.logDEBUG) + Core.logger.log(this, "writeTrailing(" + expectedTrailerSendID + ",byte[]," + offset + "," + length + "," + cb + " on " + this +" (id = " + trailerSendID + ")", new Exception("debug"), Logger.DEBUG); + if(!isSendingTrailer()) + throw new IllegalStateException("Trying to write chunk with no trailer send in progress"); + if (trailerSendID != expectedTrailerSendID) + throw new UnknownTrailerSendIDException(); + if (trailerSentBytes >= trailerSendLength) { + trailerSendID = -1; //TODO: a complete reset somewhere around here? /Iakin + throw new TrailerSendFinishedException(); + } + if (isSendingTrailerChunk()) + throw new AlreadySendingTrailerChunkException(); + if (length <= 0) + throw new IllegalArgumentException(); + if(twcb != null && twcb != cb) //TODO: Should this really throw.. introduced during Iakins 2003-12-04 refactoring + throw new IllegalStateException("Got new cb"); + twcb = cb; + trailerChunkBytes = length; + isSendingTrailerChunk = true; + + } + //Should be called whenever a chunk of the trailer has been sent. Throws if the send wasn't all of the current chunk + public synchronized void registerChunkWritten(long size) throws PartialChunkSentException { + if (ch.logDEBUG) + ch.logDEBUG("At least a part of a trailer chunk sent"); + if (size == trailerChunkBytes) { + registerChunkSendCompleted(); + }else + throw new PartialChunkSentException("Partial chunk sent"); + } + public String toString() + { + return "trailerID="+String.valueOf(trailerSendID); + } + } + private final TrailerSendState trailerSendState = new TrailerSendState(); + ReceiveInputStream currentInputStream = null; private PeerHandler peerHandler = null; // NOT final - see end of registerOCM @@ -181,14 +335,7 @@ public long trailerLengthAvailable() { if(sendClosed.state()) return 0; - synchronized(trailerSendLock) { - if(trailerSendID != -1) { - if(twcb == null) - return trailerSendLength - trailerSentBytes; - else - return twcb.bytesAvailable(); - } else return 0; - } + return trailerSendState.trailerLengthAvailable(); } //private Thread exec_instance; // execution thread @@ -241,7 +388,7 @@ //profiling //WARNING:remove before release - public static class ProfilingHelperTool + public static class ProfilingHelper { public volatile int instances=0; public volatile long terminatedInstances=0; @@ -285,9 +432,7 @@ } } } - public static final ProfilingHelperTool profilingHelperTool = new ProfilingHelperTool(); - -// private static final Object profLockTerminated = new Object(); + public static final ProfilingHelper profilingHelperTool = new ProfilingHelper(); //CHIOS notification flags protected volatile boolean CHISwaitingForNotification=false; @@ -363,6 +508,7 @@ this.maxPadding = maxPad; this.outbound = outbound; this.identity = link.getPeerIdentity(); + this.trailerSendState.setCH(this);//TODO: Ugly hack, remove when appropriate peer = new Peer(identity, link.getPeerAddress(), link.getManager(), p); @@ -396,21 +542,17 @@ isFNP = link instanceof FnpLink ? true : false; //there may be more elegant way peerHandler = registerPeerHandler(); } catch (IOException e) { - Core.logger.log(this, "IOException constructing "+this+": "+e+ - "("+link+") for "+this+", terminating", e, - Logger.MINOR); + Core.logger.log(this, "IOException constructing " + this +": " + e + "(" + link + ") for " + this +", terminating", e, Logger.MINOR); // Probably closed terminate(); throw e; } catch (Error e) { terminate(); - Core.logger.log(this, "Got "+e+" in CH.<init> for "+this, - Logger.ERROR); + Core.logger.log(this, "Got " + e + " in CH.<init> for " + this, Logger.ERROR); throw e; } catch (RuntimeException e) { terminate(); - Core.logger.log(this, "Got "+e+" in CH.<init> for "+this, - Logger.ERROR); + Core.logger.log(this, "Got " + e + " in CH.<init> for " + this, Logger.ERROR); throw e; } finally { // Finalizer is called even if constructor throws @@ -494,15 +636,14 @@ } byte[] toSend; if(receiveClosed.state() && receivingCount <= 0 && - sendClosed.state() && trailerSendID == -1) { + sendClosed.state() && !trailerSendState.isSendingTrailer()) { logDEBUG("terminating at beginning of registerOCM"); terminate(); return; } try { synchronized(sentPacketLock) { - sentPacket = peerHandler.getPacket(link, p, identity, i, - 0, false); + sentPacket = peerHandler.getPacket(link, p, identity, i, 0, false); // It does not matter when it gets the Identify so timeout 0 if(logDEBUG) Core.logger.log(this, "Sending "+sentPacket+" on "+this+ @@ -514,15 +655,14 @@ if(sentPacket != null) innerSendPacket(sentPacket.priority()); if(receiveClosed.state() && receivingCount <= 0 && - sendClosed.state() && trailerSendID == -1) { + sendClosed.state() && !trailerSendState.isSendingTrailer()) { logDEBUG("terminating at end of registerOCM"); terminate(); } else { try { peerHandler.registerConnectionHandler(this); } catch (RemovingPeerHandlerException e) { - logDEBUG("Waiting for PeerHandler to finish removing: "+ - peerHandler+": "+e); + logDEBUG("Waiting for PeerHandler to finish removing: " + peerHandler + ": " + e); peerHandler.waitForRemovedFromOCM(); peerHandler = ocm.makePeerHandler(identity, null, p); } @@ -766,9 +906,7 @@ try { rsl.register(chan, this); } catch (Throwable t) { - Core.logger.log(this, "Thrown "+t+ - " reregistering "+this, - Logger.ERROR); + Core.logger.log(this, "Thrown " + t + " reregistering " + this, Logger.ERROR); terminate(); return -1; } finally { @@ -1000,7 +1138,7 @@ synchronized(sendLock) { if(sendClosed.state()) { if(logDEBUG) logDEBUG("Forcing close RIGHT NOW"); - if(trailerSendID == -1) { + if(!trailerSendState.isSendingTrailer()) { Core.logger.log(this, "Terminating "+this+" in innerProcess(), other side asked and no trailers sending", Logger.DEBUG); terminate(); @@ -1020,9 +1158,7 @@ if(logDEBUG) logDEBUG("Returning to RSL because no remaining "+ "or moving trailing fields"); if(origDecryptLen > 0) { - Core.diagnostics. - occurrenceContinuous("messagesInPacketReceived", - msgsThisTime); + Core.diagnostics.occurrenceContinuous("messagesInPacketReceived", msgsThisTime); if(logDEBUG) logDEBUG("messagesInPacketReceived: "+msgsThisTime); } @@ -1102,8 +1238,7 @@ //Updates the peerHandler and this connection with a new NodeReference for the peer private void handleReceivedIdentifyMessage(Identify id) { if(peerHandler == null) - Core.logger.log(this, "peerHandler NULL on "+this, new Exception("debug"), - Logger.ERROR); + Core.logger.log(this, "peerHandler NULL on " + this, new Exception("debug"), Logger.ERROR); else peerHandler.updateReference(id.getSource()); if(Core.logger.shouldLog(Logger.MINOR,this)) @@ -1120,7 +1255,7 @@ Core.logger.log(this, "Partial notification: "+size+" bytes written successfully on "+ this, Logger.DEBUG); lastSizeDone = size; - if(trailerSendID == -1) { + if(!trailerSendState.isSendingTrailer()) { if(sentPacket == null) { if(!finalized.state()) Core.logger.log(this, "sentPacket NULL in jobPartDone! for "+ @@ -1147,7 +1282,7 @@ //tell everybody they failed //this is where the PeerHandler will really help // Locking! - if((!isSendingTrailerChunk) && (!finalized.state())) { + if((!trailerSendState.isSendingTrailerChunk()) && (!finalized.state())) { if(sentPacket == null) Core.logger.log(this, "sentPacket NULL! for "+this, new Exception("debug"), Logger.ERROR); @@ -1160,40 +1295,24 @@ } registerDataSent(size); - if (isSendingTrailerChunk) { - if(logDEBUG) - logDEBUG("jobDone sending chunk"); - if(size == sendingTrailerChunkBytes) { - synchronized(trailerSendLock) { - trailerSentBytes += size; - isSendingTrailerChunk = false; - sendingTrailerChunkBytes = 0; - } - if(twcb != null) { - if(logDEBUG) logDEBUG("Calling "+twcb+".written()"); - twcb.written(); - } - if(trailerSentBytes < trailerSendLength) + if (trailerSendState.isSendingTrailerChunk()) { + try { + synchronized (trailerSendState) { + trailerSendState.registerChunkWritten(size); + if (trailerSendState.hasTrailerDataRemaining()) return; // wait for next write // else we can send another packet - } else { - if(logMINOR) Core.logger.log(this, "Trailer chunk send failed for "+this+ - ", closing", Logger.MINOR); + } + } catch (TrailerSendState.PartialChunkSentException e) { + if (logMINOR) + Core.logger.log(this, "Trailer chunk send failed for " + this +", closing", Logger.MINOR); synchronized(sendLock) { sendClosed.change(true); sendLock.notifyAll(); } ocm.markClosed(this); - TrailerWriteCallback cb; - synchronized(trailerSendLock) { - isSendingTrailerChunk = false; - sendingTrailerChunkBytes = 0; - trailerSentBytes = 0; - trailerSendID = -1; - cb = twcb; - twcb = null; - } - if(cb != null) cb.closed(); + trailerSendState.reset(); + return; // closed conn, don't need another packet } } else { @@ -1216,19 +1335,9 @@ if(packet.hasTrailer()) { if(logDEBUG) logDEBUG("packet has trailer"); - int sendId; - synchronized(trailerSendIDCounterLock) { - trailerSendIDCounter++; - if(trailerSendIDCounter < 0) - trailerSendIDCounter = 0; - sendId = trailerSendIDCounter; - } - synchronized(trailerSendLock) { - trailerSendID = sendId; - trailerSendLength = packet.trailerLength(); - trailerSentBytes = 0; - } - tw = new MyTrailerWriter(sendId); + + tw = new MyTrailerWriter(trailerSendState.startNew(packet.trailerLength())); + if(logDEBUG) logDEBUG("Creating "+tw); } @@ -1249,13 +1358,13 @@ boolean needTerminate = false; // don't terminate while holding locks! if(sendClosed.state() && !sendingCloseMessage) { if(receiveClosed.state() && receivingCount == 0 && - sendClosed.state() && trailerSendID == -1) { + sendClosed.state() && !trailerSendState.isSendingTrailer()) { logDEBUG("Terminating in jobDone"); terminate(); } return; } - if(trailerSendID != -1) return; + if(trailerSendState.isSendingTrailer()) return; if(logDEBUG) logDEBUG("Trying to send a packet..."); // This is nasty... @@ -1269,7 +1378,7 @@ // But if we check trailerSendID right after relocking it, we're ok PeerPacket mySentPacket = null; synchronized(sentPacketLock) { - if(trailerSendID != -1) return; + if(trailerSendState.isSendingTrailer()) return; if(sentPacket == null) { if(logDEBUG) logDEBUG("synchronized..."); if(sendingCloseMessage) { @@ -1331,7 +1440,7 @@ } needTerminate = receiveClosed.state() && receivingCount == 0 && - sendClosed.state() && trailerSendID == -1; + sendClosed.state() && !trailerSendState.isSendingTrailer(); } if(needTerminate) { logDEBUG("Terminating in jobDone (B)"); @@ -1347,6 +1456,7 @@ Main.node.scheduleConnectionOpener(identity); } + //Called for accouting purposes only private void registerDataSent(int size) { synchronized(sendLock) { @@ -1400,37 +1510,18 @@ TrailerWriteCallback cb) throws UnknownTrailerSendIDException, TrailerSendFinishedException, AlreadySendingTrailerChunkException, IOException { lastActiveTime = System.currentTimeMillis(); - synchronized(trailerSendLock) { - twcb = null; - if(logDEBUG) - Core.logger.log(this, "writeTrailing("+id+",byte[],"+ - offset+","+length+","+cb+" on "+this+ - " (id = "+trailerSendID+")", - new Exception("debug"), Logger.DEBUG); + synchronized(trailerSendState) { if(finalized.state()) throw new IOException("Closed: "+finalized.state()+":"+this); - if(trailerSendID != id) - throw new UnknownTrailerSendIDException(); - if(trailerSentBytes >= trailerSendLength) { - trailerSendID = -1; - throw new TrailerSendFinishedException(); - } - if(isSendingTrailerChunk) { - throw new AlreadySendingTrailerChunkException(); - } - if(length <= 0) - throw new IllegalArgumentException(); - sendingTrailerChunkBytes = length; - isSendingTrailerChunk = true; + trailerSendState.registerStartChunkWrite(id, offset,length,cb); Link l = link; if(l == null) { IOException e = new IOException("Connection closed in trailer send!"); - Core.logger.log(this, "Oops: "+e+" ("+this+")", e, - Logger.NORMAL); + Core.logger.log(this, "Oops: " + e + " (" + this +")", e, Logger.NORMAL); closeTrailer(id); } - twcb = cb; + // Send the data Core.diagnostics.occurrenceCounting("outputBytesTrailingAttempted", length); sendBytes(block, offset, length, WriteSelectorLoop.TRAILER); @@ -1440,34 +1531,10 @@ public void closeTrailer(int id) { // Inspired by SOS.done() trailingPresent = false; // atomic - if(logDEBUG) - logDEBUG("closeTrailer("+id+"): "+twcb, true); - synchronized(trailerSendLock) { - if(isSendingTrailerChunk) { - Core.logger.log(this, "Closing trailer "+id+" on "+this+ - " while still sending trailer chunk!", - new Exception("debug"), - Logger.ERROR); - } - if(trailerSendID == id) { - if(trailerSentBytes != trailerSendLength) { - Core.logger.log(this, "Called closeTrailer, when only sent "+ - trailerSentBytes+" of "+trailerSendLength+ - " ("+this+")", new Exception("hrrm"), - Logger.MINOR); - } - trailerSendID = -1; - trailerSentBytes = 0; - trailerSendLength = 0; - twcb = null; - } else if (trailerSendID != -1) { - Core.logger.log(this, "Closing trailer "+id+" when handling "+ - trailerSendID+"! ("+this+")", - new Exception("hrrm"), Logger.ERROR); - } - } + trailerSendState.closeTrailerSend(id); + if (receiveClosed.state() && receivingCount == 0 - && sendClosed.state() && trailerSendID == -1) { + && sendClosed.state() && !trailerSendState.isSendingTrailer()) { if(logDEBUG) logDEBUG("terminating in closeTrailer"); terminate(); } @@ -2137,7 +2204,7 @@ try { synchronized(sentPacketLock) { if(sentPacket != null) return false; - if(trailerSendID != -1) { + if(trailerSendState.isSendingTrailer()) { if(logDEBUG) logDEBUG("forceSendPacket("+ppm+ ") called but sending a trailer!"); @@ -2153,8 +2220,7 @@ return innerSendPacket(sentPacket.priority()); else return false; } catch (Throwable t) { - Core.logger.log(this, "Caught "+t+" in forceSendPacket", t, - Logger.ERROR); + Core.logger.log(this, "Caught " + t + " in forceSendPacket", t, Logger.ERROR); return false; } } @@ -2175,7 +2241,7 @@ if(sentPacket != null) { return false; } - if(trailerSendID != -1) { + if(trailerSendState.isSendingTrailer()) { if(logDEBUG) logDEBUG("sendPacket("+packet+","+prio+ " called but sending a trailer!"); @@ -2207,7 +2273,7 @@ } byte[] toSend = sp.getBytes(); // extra paranoia - if(trailerSendID != -1) { + if(trailerSendState.isSendingTrailer()) { if(logDEBUG) logDEBUG("innerSendPacket("+prio+","+sp+ " called but sending a trailer!"); @@ -2363,10 +2429,8 @@ if(logDEBUG) logDEBUG("Logging stats: connectionLifeTime="+connectionLifetime+ ", messages="+messages); - Core.diagnostics.occurrenceContinuous("connectionLifeTime", - connectionLifetime); - Core.diagnostics.occurrenceContinuous("connectionMessages", - messages); + Core.diagnostics.occurrenceContinuous("connectionLifeTime", connectionLifetime); + Core.diagnostics.occurrenceContinuous("connectionMessages", messages); } // Need to release sendLock first... if(logDEBUG) logDEBUG("notified CHOS in terminate()"); @@ -2411,16 +2475,7 @@ if(logDEBUG) logDEBUG("Closed CHIS in terminate()"); link = null; conn = null; - TrailerWriteCallback cb; - synchronized(trailerSendLock) { - isSendingTrailerChunk = false; - sendingTrailerChunkBytes = 0; - trailerSentBytes = 0; - trailerSendID = -1; - cb = twcb; - twcb = null; - } - if(cb != null) cb.closed(); + trailerSendState.reset(); if(peerHandler != null) { peerHandler.unregisterConnectionHandler(this); } else { @@ -2454,7 +2509,7 @@ */ public final long idleTime() { //return (sending.count() > 0 || receiving.count() > 0 ? - return (trailerSendID != -1 || receivingCount > 0 ? + return (trailerSendState.isSendingTrailer() || receivingCount > 0 ? 0 : System.currentTimeMillis() - lastActiveTime); } @@ -2472,7 +2527,7 @@ [EMAIL PROTECTED] whether the connection is currently sending something */ public final boolean sending() { - return trailerSendID != -1; + return trailerSendState.isSendingTrailer(); } /** @@ -2863,7 +2918,7 @@ public String toString() { return super.toString()+" for "+conn+","+link+", sending "+sentPacket+ - ":"+trailerSendID+" for "+peerHandler; + ":"+trailerSendState.toString()+" for "+peerHandler; } } _______________________________________________ cvs mailing list [EMAIL PROTECTED] http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs