Update of /cvsroot/freenet/freenet/src/freenet
In directory sc8-pr-cvs1:/tmp/cvs-serv23996/src/freenet
Modified Files:
ConnectionHandler.java PeerHandler.java PeerPacket.java
Version.java
Log Message:
6252
- Change NGRouting discard algorithm: Don't compare by successes if the node you are
comparing to was not created yet at the time of your last success.
-- Keep original creation time for each node's estimator in the RT. Age is displayed
on the node page.
- Make PeerPacket calculate message priorities
-- QueryRejected's don't count at all
-- Identify's count for four
-- Request's count for two
-- Being in the RT counts for one
-- We start at prio message+1 i.e. trailer.
- Fix sending of first packet for a while on conns: We were sending single message
packets even when messages were queued, thus wasting the first packet.
-- Add ConnectionHandler.forceSendPacket() - sends a packet which it gets from the
peerhandler
- Lose most synchronization on NGRT.route(). Add minimal synchronization at a lower
level on StandardNodeEstimator, DecayingRunningAverage etc.
- Delete some unused code from ConnectionHandler, most of it already commented out
- Fix KeyCollisionException in StoreIOException error
- Fix bug in maxConnDefault for macos/x
- Catch race in ASL without throwing
- Logging etc
Index: ConnectionHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandler.java,v
retrieving revision 1.185
retrieving revision 1.186
diff -u -w -r1.185 -r1.186
--- ConnectionHandler.java 16 Oct 2003 01:03:57 -0000 1.185
+++ ConnectionHandler.java 17 Oct 2003 01:43:28 -0000 1.186
@@ -418,8 +418,7 @@
logDEBUG("Caught "+e+" in getPacket in registerOCM");
}
if(sentPacket != null)
- innerSendPacket(wsl.MESSAGE - 1 - sentPacket.countMessages(),
- sentPacket);
+ innerSendPacket(sentPacket.priority(), sentPacket);
if(receiveClosed.state() && receivingCount <= 0 &&
sendClosed.state() && trailerSendID == -1) {
logDEBUG("terminating at end of registerOCM");
@@ -1001,51 +1000,6 @@
boolean checkingSending = false;
Object checkingSendingLock = new Object();
- /**
- * Check that we are in fact reallySending, and if the flag is incorrect,
- * firstly, correct it, and secondly, log a big fat error message.
- * Will get false positives if called within *sendMessage*, so don't
- * use it in OpenConnectionManager.
- */
-// public void checkReallySending() {
-// if(!doCheckReallySending) return;
-// synchronized(checkingSendingLock) {
-// if(checkingSending) return;
-// checkingSending = true;
-// }
-// try {
-// synchronized(sendLock) {
-// for(Iterator i = sentMessages.iterator();i.hasNext();)
{
-// MessageSend ms = (MessageSend)(i.next());
-// if(ms.hasTrailing) {
-// Core.logger.log(this, ms.toString()+
-// " on
sentMessages has trailing for "+
-// this,
Logger.DEBUG);
-// return;
-// }
-// }
-// for(Iterator i = sendingQueue.iterator();i.hasNext();)
{
-// MessageSend ms = (MessageSend)(i.next());
-// if(ms.hasTrailing) {
-// Core.logger.log(this, ms.toString()+
-// " on
sendingQueue has trailing for "+
-// this,
Logger.DEBUG);
-// return;
-// }
-// }
-// if(trailerSendID != -1) {
-// Core.logger.log(this, "We have a sender",
-// Logger.DEBUG);
-// return;
-// }
-// Core.logger.log(this, "TRAILING PRESENT FLAG INCORRECT
FOR "+this,
-// Logger.ERROR);
-// }
-// } finally {
-// checkingSending = false;
-// }
-// }
-
public void jobDone(int size, boolean status) {
lastActiveTime = System.currentTimeMillis();
boolean logMINOR = Core.logger.shouldLog(Logger.MINOR, this);
@@ -1060,7 +1014,6 @@
//tell everybody they failed
//this is where the PeerHandler will really help
// Locking!
- MessageSend[] msgs = null;
if((!sendingTrailerChunk) && (!finalized.state())) {
if(sentPacket == null)
Core.logger.log(this, "sentPacket NULL! for
"+this,
@@ -1217,13 +1170,13 @@
}
if (mySentPacket != null) {
- if(mySentPacket != sentPacket)
+ if(mySentPacket != sentPacket) {
+ if(!finalized.state())
Core.logger.log(this, "mySentPacket = "+mySentPacket+
", but sentPacket =
"+sentPacket+" ("+
this+")",
Logger.ERROR);
- else {
- innerSendPacket(wsl.MESSAGE + 1 -
sentPacket.countMessages(),
- sentPacket);
+ } else {
+ innerSendPacket(sentPacket.priority(), sentPacket);
int sentPacketLength = mySentPacket.getLength();
int sentPacketMessages = mySentPacket.countMessages();
if(!sendClosed.state()) {
@@ -2026,195 +1979,31 @@
long lastArrival = System.currentTimeMillis();
long lastArrivalNoQR = System.currentTimeMillis();
- public final class MessageSend {
- final Message m;
- final RawMessage raw;
- SendFailedException sfe;
- final boolean hasTrailing;
- final byte[] toSend;
- boolean done = false;
- boolean success = false;
- final long startTime;
- final long startTimeNoQR;
- long sendStartTime;
- long sendStartTimeNoQR;
- MessageSendCallback cb;
- int priority = wsl.MESSAGE;
-
- // Call synchronized(sendLock)
- MessageSend(Message m, RawMessage raw, byte[] b,
- MessageSendCallback cb) throws
SendFailedException {
- this.m = m;
- this.raw = raw;
- this.cb = cb;
- sfe = null;
- toSend = b;
- hasTrailing = (raw.trailingFieldLength > 0);
- startTime = System.currentTimeMillis();
- Core.diagnostics.occurrenceContinuous
- ("messageSendInterarrivalTime",
- System.currentTimeMillis() - lastArrival);
- lastArrival = System.currentTimeMillis();
-
- //FIXME: implement m.doWeCare for more generic counting
- if (m.getMessageName().compareTo("QueryRejected") !=0 ) {
- Core.diagnostics.occurrenceContinuous
- ("messageSendInterarrivalTimeNoQR",
- System.currentTimeMillis() - lastArrivalNoQR);
- lastArrivalNoQR = System.currentTimeMillis();
-
- startTimeNoQR = lastArrivalNoQR;
- } else
- startTimeNoQR = 0;
-
- }
-
- void onStart() {
- sendStartTime = System.currentTimeMillis();
- if (m.getMessageName().compareTo("QueryRejected") !=0 )
- sendStartTimeNoQR = sendStartTime;
- }
-
- void start() throws SendFailedException {
- try {
- Core.logger.log(this, "Starting to send "+this+" for "+
-
ConnectionHandler.this, Logger.DEBUG);
- sendBytes(toSend, 0, toSend.length, wsl.MESSAGE);
- onStart();
- } catch (IOException e) {
- Core.logger.log(this, "Got IOException trying to
sendMessage: "+this+" ("+ConnectionHandler.this+")", e, Logger.DEBUG);
- //needToRemoveFromOCM = true;
- // Keep it in the OCM. It will be terminated
eventually.
- synchronized(sendLock) {
- sendClosed.change(true);
- sendLock.notifyAll();
- }
- // terminal failure
- String s = e.getMessage();
- if(s != null) s = s.getClass().getName()+": "+s;
- if(s != null) {
- throw new
SendFailedException(peer.getAddress(), null,s, true);
- //throw sfe;
- }
- else {
- throw new
SendFailedException(peer.getAddress(), true);
- //throw sfe;
- }
- }
- }
-
- void closed() {
- logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
- if(logDEBUG) logDEBUG("MessageSend closed(): "+this);
- if(!done) jobDone(0, false);
- }
-
- // Override this to get called when done
- public void notifyDone() {
- synchronized(this) {
- this.notifyAll();
- }
- //log the total time in queue
- long t = System.currentTimeMillis() - startTime;
- if(sfe != null) success = false;
- Core.diagnostics.occurrenceBinomial("messageSuccessRatio", 1,
-
success ? 1 : 0);
- if(success) {
-
Core.diagnostics.occurrenceContinuous("messageSendTime", t);
- if(identity != null &&
Main.node.rt.references(identity))
- Core.diagnostics.
-
occurrenceContinuous("messageSendTimeRT", t);
- else
- Core.diagnostics.
-
occurrenceContinuous("messageSendTimeNonRT", t);
- Core.logger.log(this, "messageSendTime "+t+" for
"+this+
- " for
"+ConnectionHandler.this,
- t>20000 ?
Logger.NORMAL : Logger.MINOR);
- if (m.getMessageName().compareTo("QueryRejected") !=0
) {
- long t2 = System.currentTimeMillis() -
startTimeNoQR;
-
Core.diagnostics.occurrenceContinuous("messageSendTimeNoQR", t2);
- }
-
- //log only the time it took to send
- if(sendStartTime > 0) {
- t = System.currentTimeMillis() - sendStartTime;
- if (shouldThrottle()){
-
Core.diagnostics.occurrenceContinuous("messageSendServiceTime",t);
- Core.logger.log(this,
"messageSendServiceTime "+t+" for "+
- this+"
for "+ConnectionHandler.this,
-
t>20000 ? Logger.NORMAL : Logger.MINOR);
- if
(m.getMessageName().compareTo("QueryRejected") !=0 )
-
Core.diagnostics.occurrenceContinuous("messageSendServiceTimeNoQR", t);
- }
- }
- if(cb != null) cb.succeeded();
- } else {
- if(sfe == null)
- Core.logger.log(this, "Failed but SFE null!:
"+this+" ("+
-
ConnectionHandler.this+")", Logger.NORMAL);
- if(cb != null) cb.thrown(sfe);
- }
- //System.out.println("notified MessageSend");
- }
-
- void jobDone(int size, boolean status) {
- logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
- try {
- if(size == toSend.length) {
- success = true;
- if(logDEBUG) logDEBUG("Successfully sent
message "+this);
- messages++;
- } else {
- sfe = new
SendFailedException(peer.getAddress(), true);
- Core.logger.log(this, "Send failed for
"+this+" ("+
-
ConnectionHandler.this+") in jobDone("+
-
size+","+status+") for "+toSend.length, sfe,
- Logger.MINOR);
- }
- if(logDEBUG) logDEBUG("MessageSend finished sending
message "+this);
- done = true;
- } finally {
- // only if there is a trailing field to write do we not
- // decrement sendingCount and notify() immediately
- if(logDEBUG) logDEBUG("in jobDone finally{} "+this);
+ /**
+ * Force the ConnectionHandler to start sending a packet
+ * @returns true if we sent a packet, false if we were already sending
+ * one.
+ * @throws nil. Must log an error and return false if it was going to
+ * throw.
+ * Called by PeerHandler.innerSendMessageAsync
+ */
+ public final boolean forceSendPacket(PeerPacketMessage ppm) {
try {
- if (!hasTrailing) {
- //sending.decCount(); // done
- synchronized(sendLock) {
-
- if(logDEBUG)
logDEBUG("Receiving "+receivingCount+
-
" messages, send id: "+
-
trailerSendID+
-
", receiveClosed="+
-
receiveClosed.state()+
-
", sendClosed="+
-
sendClosed.state()+" ("+
-
this+")");
- if (receiveClosed.state() &&
receivingCount == 0
- && sendClosed.state()
&& trailerSendID == -1) {
- if(logDEBUG)
logDEBUG("terminating");
- terminate();
- }
- sendLock.notifyAll();
- if(logDEBUG)
logDEBUG("notified");
- }
- }
-// if (needToRemoveFromOCM) {
-// if(logDEBUG) logDEBUG("Need to remove
from OCM");
-// // Not synced, this is called directly
on the WSL thread
-// needToRemoveFromOCM = false;
-// if(logDEBUG) logDEBUG("Removed from
OCM");
-// }
+ synchronized(sentPacketLock) {
+ if(sentPacket != null) return false;
+ sentPacket = peerHandler.getPacket(link, p, identity,
ppm,
+
false);
+ if(logDEBUG)
+ Core.logger.log(this, "Sending "+sentPacket+"
on "+this+
+
".forceSendPacket()", Logger.DEBUG);
+ }
+ if(sentPacket != null)
+ return innerSendPacket(sentPacket.priority(),
sentPacket);
+ else return false;
} catch (Throwable t) {
- Core.logger.log(this, "Caught throwable "+t+"
in "+
-
ConnectionHandler.this+"."+this+"."+
- "jobDone",t,
Logger.ERROR);
- }
- // notify MUST get called no matter what
- if(logDEBUG) logDEBUG("Notifying "+this);
- notifyDone();
- if(logDEBUG) logDEBUG("Notified "+this);
- }
+ Core.logger.log(this, "Caught "+t+" in forceSendPacket", t,
+ Logger.ERROR);
+ return false;
}
}
@@ -2256,7 +2045,7 @@
* Do NOT call while synchronized on sentPacketLock - hence the argument
* @param sentPacket the packet to send
*/
- protected void innerSendPacket(int prio, PeerPacket sentPacket) {
+ protected boolean innerSendPacket(int prio, PeerPacket sentPacket) {
lastActiveTime = System.currentTimeMillis();
PeerPacket sp = sentPacket;
byte[] toSend = sp.getBytes();
@@ -2271,6 +2060,7 @@
}
sentPacket = null;
sp.jobDone(true, 0, peer, null);
+ return false;
} catch (Throwable t) {
Core.logger.log(this, "Caught "+t+ " trying to send packet "+
sentPacket, t, Logger.ERROR);
@@ -2280,7 +2070,9 @@
}
sentPacket = null;
sp.jobDone(true, 0, peer, null);
+ return false;
}
+ return true;
}
// boolean needToRemoveFromOCM = false;
@@ -2301,311 +2093,6 @@
htl);
}
-// public MessageSend sendMessageAsync(Message m)
-// throws SendFailedException {
-// return sendMessageAsync(m, null);
-// }
-
-// public MessageSend sendMessageAsync(Message m, MessageSendCallback cb)
-// throws SendFailedException {
-// RawMessage raw = m.toRawMessage(p);
-// synchronized(this) {
-// if (conn == null || trailingPresent)
-// //this should not be set when entering new message
-// throw new SendFailedException(peer.getAddress(),
false);
-// return innerSendMessageAsync(m, raw, cb);
-// }
-// }
-
-// public void unsendMessageAsync(MessageSendCallback cb) {
-// if(sendClosed.state()) return;
-// synchronized(sendLock) {
-// for(Iterator i=sendingQueue.listIterator(0);i.hasNext();) {
-// MessageSend m = (MessageSend)(i.next());
-// if(m.cb == cb) {
-// if(m.hasTrailing)
-// trailingPresent = false; // we hope!
-// i.remove();
-// sendingCount--;
-// synchronized(sendingQueueBytesLock) {
-// sendingQueueBytes -= m.toSend.length;
-// if (logDEBUG)
-// Core.logger.log(this, "Removed
(E) message of "+
-//
"size "+m.toSend.length+" to "+
-//
"sendingQueue: now "+
-//
sendingQueueBytes+" bytes for "+
-//
this, Logger.DEBUG);
-// }
-// return; // Hopefully there isn't more than
one...
-// }
-// }
-// }
-// }
-
-// protected MessageSend innerSendMessageAsync(Message m, RawMessage raw,
-//
MessageSendCallback cb)
-// //no other way really
-// throws SendFailedException {
-// logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
-// try {
-// if (sendClosed.state())
-// throw new SendFailedException(peer.getAddress(),
false);
-// // needToRemoveFromOCM = false;
-// synchronized(sendLock) {
-// // Do not remove sendLock without seeing comments below
-// if(trailingPresent)
-// checkReallySending();
-// if (sendClosed.state())
-// // check again when synced
-// throw new
SendFailedException(peer.getAddress(), false);
-// if(trailerSendID != -1)
-// throw new
SendFailedException(peer.getAddress(), false);
-// MessageSend ms = null;
-// try {
-// sendingCount++;
-// if (raw.trailingFieldLength > 0) {
-// trailingPresent=true;
-// if (logDEBUG)Core.logger.log(this,
"enqueing trailer, queue size " +
-//
sendingQueue.size()+" on "+this,
-//
Logger.DEBUG);
-// }
-// if(identity != null &&
-// ocm.needsConnection(identity))
-//
Main.node.scheduleConnectionOpener(identity);
-// //waitForOneSender();
-// // if we were just waiting for the connection
to become free..
-// if(m == null) return null;
-// if(raw.close) {
-// //needToRemoveFromOCM = true;
-// logDEBUG("Closing because message says
so");
-// sendClosed.change(true);
-// //} else if (raw.sustain) {
-// // persist.change(true);
-// // hm, we only want to persist if we
*receive*
-// // a sustain message, right?
-// // sendingCount = 1 - it will get
closed after it finishes
-// }
-// if(logDEBUG) logDEBUG(sendingCount + "
messages in sendqueue");
-
-// if(logDEBUG) logDEBUG("Sending RawMessage: " +
raw, true);
-
-// lastSizeDone = 0;
-// // WATCHME
-// if (Main.watchme != null) {
-// logWatchme(m, raw);
-// }
-
-// // users are fond of this log entry
-// if (Core.logger.shouldLog(Logger.MINOR,this))
-// Core.logger.log(this,
-//
raw.messageType + " @ " +
-//
Long.toHexString(m.id())
-// + " ->
" + peer.getAddress(),
-//
Logger.MINOR);
-
-// // Now send the message
-// ByteArrayOutputStream bais = new
ByteArrayOutputStream();
-// Link l = link;
-// if(l == null) return null;
-// OutputStream os = l.makeOutputStream(bais);
-// // We are synchronized on sendLock, so we do
not need
-// // to lock os
-// try {
-// raw.writeMessage(os);
-// os.flush();
-// } catch (IOException e) {
-// e.printStackTrace();
-// Core.logger.log(this, "Impossible
exception for "+
-//
ConnectionHandler.this+": "+e, e,
-//
Logger.ERROR);
-// throw new
IllegalStateException("Impossible exception!: "+
-//
e);
-// }
-// byte[] b = bais.toByteArray();
-// //moved the enqueing after the crypto
operations
-// //synchronized(sendingQueue) { //lock the
entire enqueueing
-
-// ms = new MessageSend(m, raw, b, cb);
-
-// if(logDEBUG) logDEBUG("Got MessageSend "+ms+":
"+b.length+" bytes");
-// //}// end of monitor
-// if (sendingQueue.size()==0 &&
sentMessages.size() == 0) {
-// try {
-// if (logDEBUG)
-//
Core.logger.log(this,"executing messageSend "+
-//
"immediately ("+this+","+ms+")",
-//
Logger.DEBUG);
-// sentMessages.add(ms);
-// sentMessagesCount = 1;
-// if(m instanceof Identify)
-// ms.priority =
wsl.NEGOTIATION-1;
-// // Very important message!
-
-// ms.start();
-// } catch (SendFailedException e) {
-// Core.logger.log(this, "Got
IOException trying to "+
-//
"sendMessage: "+this, e,
-//
Logger.MINOR);
-// ms = null;
-// throw e;
-// }
-// } else {
-// sendingQueue.add(ms);
-// Core.logger.log(this, "Added (D)
message of size "+
-//
b.length+" to sendingQueue: now "+
-//
sendingQueue.size()+" for "+this,
-//
Logger.DEBUG);
-// synchronized(sendingQueueBytesLock) {
-// sendingQueueBytes += b.length;
-// Core.logger.log(this, "Added
(D) message of "+
-//
"size "+b.length+" to "+
-//
"sendingQueue: now "+
-//
sendingQueueBytes+
-//
" bytes for "+this,
-//
Logger.DEBUG);
-// }
-// if (logDEBUG)
-//
Core.logger.log(this,"scheduling ms on queue."+
-//
" size now "+sendingQueue.size(),
-//
Logger.DEBUG);
-// incSendQueue(b.length +
raw.trailingFieldLength);
-// if(logDEBUG) logDEBUG("Started
MessageSend: "+ms);
-// }
-// return ms;
-// } finally {
-// // only if there is a trailing field to write
do we not
-// // decrement sendingCount and notify()
immediately
-// // Can't possibly have started moving trailing
if we have an error here
-// if(logDEBUG) logDEBUG("in sendMessageAsync
finally{}");
-// //sending.decCount(); // done
-// if(ms == null) {
-// --sendingCount;
-// if(logDEBUG) logDEBUG("send == null");
-// if(logDEBUG) logDEBUG("Receiving
"+receivingCount+
-// " messages, sending
"+sendingCount+
-// " messages,
receiveClosed="+
-//
receiveClosed.state()+", sendClosed="+
-// sendClosed.state()+"
("+this+")");
-// if (receiveClosed.state() &&
receivingCount == 0
-// && sendClosed.state() &&
sendingCount == 0) {
-// if(logDEBUG)
logDEBUG("terminating");
-// terminate();
-// }
-// sendLock.notifyAll();
-// if(logDEBUG) logDEBUG("notified");
-// }
-// }
-// }
-// } finally {
-// tcpConnection c = conn;
-// if(c != null) {
-// if(c.isClosed()) {
-// terminate();
-// }
-// // else if (needToRemoveFromOCM) {
-// // if(logDEBUG) logDEBUG("Need to remove from
OCM");
-// // removeFromOCM();
-// // needToRemoveFromOCM = false;
-// // if(logDEBUG) logDEBUG("Removed from OCM");
-// // }
-// }
-// }
-// }
-
-// /** Blocking send of a message
-// * @return a TrailerWriter for the attached trailing field if there is one,
null otherwise
-// */
-// public TrailerWriter sendMessage(Message m)
-// throws SendFailedException {
-// MessageSend ms;
-// RawMessage raw;
-// long startedsend=0;
-// TrailerWriter send = null;
-// synchronized(this) { //extend locking even further
-// if (conn == null || trailingPresent) //this should not be set
when entering new message
-// throw new SendFailedException(peer.getAddress(),
false);
-// if(logDEBUG) logDEBUG("SendMessage("+m+")", true);
-// raw = m.toRawMessage(p);
-// if(logDEBUG) logDEBUG("raw is "+raw);
-// ms = innerSendMessageAsync(m, raw, null);
-// if(logDEBUG) logDEBUG("sendMessageAsync called");
-// }
-// synchronized(ms) {
-// while(!ms.done) {
-// try {
-// if(logDEBUG) logDEBUG("Waiting for MessageSend
"+ms);
-// startedsend=System.currentTimeMillis();
-// ms.wait(5*60*1000); //FIXME:hardcoded 5 minutes
-// } catch (InterruptedException e) {
-// throw new
SendFailedException(peer.getAddress(), true);
-// };
-// if (System.currentTimeMillis() - startedsend >=
5*60*1000)
-// throw new
SendFailedException(peer.getAddress(), true);
-// }
-// if(logDEBUG) logDEBUG("Waited for MessageSend");
-// }
-// int sendId = -1;
-// if(ms.sfe != null) {
-// Core.logger.log(this, "MessageSend produced
SendFailedException "+
-// " for "+this+": "+ms.sfe,
ms.sfe, Logger.MINOR);
-// throw ms.sfe;
-// }
-// boolean knowAboutTrailing = false;
-// if(ms.success) {
-// if(logDEBUG) logDEBUG("Success!");
-// try {
-// if (ms.hasTrailing) {
-// // has trailing
-// knowAboutTrailing = true;
-// if(logDEBUG) logDEBUG("Has trailing");
-// // OutputStream os = new CHOutputStream();
-// Link l = link;
-// if(l == null) throw new IOException("Closed in
middle of send");
-
-// // os = l.makeOutputStream(os);
-// synchronized(trailerSendIDCounterLock) {
-// trailerSendIDCounter++;
-// if(trailerSendIDCounter < 0)
trailerSendIDCounter = 0;
-// sendId = trailerSendIDCounter;
-// }
-// synchronized(trailerSendLock) {
-// trailerSendID = sendId;
-// trailerSendLength =
ms.raw.trailingFieldLength;
-// trailerSentBytes = 0;
-// }
-// send = new MyTrailerWriter(sendId);
-// if(logDEBUG) logDEBUG("Started
MyTrailerWriter("+sendId+"): "+send);
-// } else {
-// if(logDEBUG) logDEBUG("Does not have
trailing");
-// // does not
-// //Core.logger.log(this, "Message sent.",
Logger.DEBUG);
-// Core.randSource.acceptTimerEntropy(sendTimer);
-// lastActiveTime = System.currentTimeMillis();
-// }
-// } catch (IOException e) {
-// Core.logger.log(this, "Got IOException trying to
sendMessage: "+this, e, Logger.DEBUG);
-// tcpConnection c = conn;
-// if(c != null) {
-// if(c.isClosed()) {
-// terminate();
-// }// else {
-// // removeFromOCM();
-// // }
-// } // else already closed
-// synchronized(sendClosed) {
-// sendClosed.change(true);
-// }
-// // terminal failure
-// throw new SendFailedException(peer.getAddress(), true);
-// }
-// }
-// if(logDEBUG) logDEBUG("Returning "+send);
-// return send;
-// }
-
-
-
-
// IMPORTANT: Don't call this while holding sendLock
// or receiveLock or you will cause a
// deadlock.
@@ -2650,7 +2137,7 @@
sendingCloseMessage = false;
}
}
- innerSendPacket(wsl.MESSAGE, sentPacket);
+ innerSendPacket(sentPacket.priority(), sentPacket);
if(logDEBUG)
logDEBUG("Sent close packet");
}
@@ -3248,8 +2735,8 @@
// }
public String toString() {
- return super.toString()+" for "+conn+","+link+", sending
"+sentPacket+":"+
- trailerSendID;
+ return super.toString()+" for "+conn+","+link+", sending "+sentPacket+
+ ":"+trailerSendID+" for "+peerHandler;
}
}
Index: PeerHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerHandler.java,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -w -r1.22 -r1.23
--- PeerHandler.java 16 Oct 2003 01:03:58 -0000 1.22
+++ PeerHandler.java 17 Oct 2003 01:43:28 -0000 1.23
@@ -46,6 +46,7 @@
int totalOutboundFailures = 0;
boolean logDEBUG;
+ boolean inRT = false;
int maxPacketSize;
@@ -188,7 +189,7 @@
public NodeReference updateReference(NodeReference nr) {
if(logDEBUG)
Core.logger.log(this, "updateReference("+nr+") on "+this,
- Logger.DEBUG);
+ new Exception("debug"),
Logger.DEBUG);
if(nr.supersedes(ref)) {
if(logDEBUG)
Core.logger.log(this, "superceded old ref "+ref+" on
"+this,
@@ -221,7 +222,7 @@
boolean inRT = false;
if(weak) {
quitNow = messages.isEmpty() && messagesWithTrailers.isEmpty()
&&
- (id == null || (!(inRT = node.rt.references(id))));
+ !(inRT = !(id == null || !node.rt.references(id)));
if(quitNow && connectionHandlers.isEmpty()) {
if(Core.logger.shouldLog(Logger.DEBUG, this))
Core.logger.log(this, "returning false
immediately",
@@ -343,6 +344,7 @@
messagesWithTrailers = new LinkedList();
connectionHandlers = new LinkedList();
logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
+ inRT = (id == null || !(node.rt.references(id)));
}
public long timeSinceLastMessageSent() {
@@ -372,6 +374,7 @@
}
protected void innerSendMessageAsync(PeerPacketMessage pm) {
+ inRT = !(id == null || !node.rt.references(id));
/* Possibilities:
*
* 1. No ConnectionHandlers (without trailing fields) open.
@@ -394,6 +397,8 @@
Logger.DEBUG);
lastMessageSentTime = System.currentTimeMillis();
int handlersSendingPackets = 0;
+ while(true) {
+ ConnectionHandler freeConn = null;
synchronized(connectionHandlers) {
for(Iterator e = connectionHandlers.listIterator(0);
e.hasNext();) {
@@ -402,8 +407,9 @@
if(!ch.isOpen()) {
// Can't send messages
if(logDEBUG) Core.logger.log(this,
ch.toString()+
- " can't send
messages for "+pm+" on "+
- this,
Logger.DEBUG);
+
" can't send messages for "
+
+pm+" on "+this,
+
Logger.DEBUG);
e.remove();
continue;
}
@@ -422,16 +428,25 @@
}
// Not busy!
if(logDEBUG) Core.logger.log(this, ch.toString()+
- ": send one packet
"+pm+" for "+this,
+
": send one packet "+pm+
+
" for "+this,
Logger.DEBUG);
+ freeConn = ch;
+ break;
+ }
+ }
+ if(freeConn != null) {
try {
- if(sendSinglePacket(ch, pm)) return;
+ sendSinglePacket(freeConn, pm);
+ if(messages.isEmpty() &&
messagesWithTrailers.isEmpty())
+ return; // Otherwise there is more to
send on another conn - if we can find one
} catch (IOException ex) {
- Core.logger.log(this, ch.toString()+
+ Core.logger.log(this, freeConn.toString()+
": caught
"+ex+" trying to send packet ("+
this+")",
Logger.MINOR);
}
}
+ break;
}
// No suitable ConnectionHandlers found!
// Queue the message
@@ -494,18 +509,23 @@
* message on the connection because of a locking conflict
*/
protected boolean sendSinglePacket(ConnectionHandler ch,
-
PeerPacketMessage pm)
+
PeerPacketMessage ppm)
throws IOException {
- PeerPacketMessage[] msgs = new PeerPacketMessage[] { pm };
- PeerPacket packet = new PeerPacket(msgs, ch.getLink(),
-
ch.presentationType());
- return ch.sendPacket(packet,
-
freenet.transport.WriteSelectorLoop.MESSAGE);
+ return ch.forceSendPacket(ppm);
}
public final PeerPacket getPacket(Link link, Presentation p)
throws IOException {
- return getPacket(link, p, null, null, false);
+ return getPacket(link, p, null, (PeerPacketMessage)null, false);
+ }
+
+ public PeerPacket getPacket(Link link, Presentation p,
+ Identity i, Message m,
+ boolean onlyGivenMsg)
+ throws IOException {
+ return getPacket(link, p, i,
+ new PeerPacketMessage(i, m, null,
NORMAL, this),
+ onlyGivenMsg);
}
/**
@@ -520,7 +540,7 @@
* @throws IOException if the connection is already closed
*/
public PeerPacket getPacket(Link link, Presentation p,
- Identity i, Message m,
+ Identity i,
PeerPacketMessage m,
boolean onlyGivenMsg)
throws IOException {
logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
@@ -538,9 +558,11 @@
LinkedList packetMessages = new LinkedList();
if(logDEBUG)
Core.logger.log(this, "getPacket("+link+","+p+","+i+","+m+","+
- onlyGivenMsg+" on "+this,
Logger.DEBUG);
+ onlyGivenMsg+" on "+this+"
("+messages.size()+
+ " messages,
"+messagesWithTrailers.size()+
+ " messages with trailers",
Logger.DEBUG);
if(m != null) {
- packetMessages.add(new PeerPacketMessage(i, m, null, NORMAL,
this));
+ packetMessages.add(m);
}
if(!onlyGivenMsg) {
boolean msgsEmpty = false;
@@ -590,7 +612,7 @@
PeerPacketMessage[] msgs =
new PeerPacketMessage[packetMessages.size()];
packetMessages.toArray(msgs);
- PeerPacket packet = new PeerPacket(msgs, link, p);
+ PeerPacket packet = new PeerPacket(msgs, link, p, inRT);
if(logDEBUG)
Core.logger.log(this, "Returning "+packet+" from
getPacket ("+
this+")",
Logger.DEBUG);
Index: PeerPacket.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacket.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -w -r1.9 -r1.10
--- PeerPacket.java 11 Oct 2003 20:00:14 -0000 1.9
+++ PeerPacket.java 17 Oct 2003 01:43:28 -0000 1.10
@@ -3,6 +3,7 @@
import java.io.OutputStream;
import java.io.IOException;
import freenet.Core;
+import freenet.message.*;
import freenet.support.Logger;
class PeerPacket {
@@ -12,6 +13,8 @@
int sentBytes;
boolean hasTrailer = false;
boolean hasCloseMessage = false;
+ int priority;
+ boolean isInRT;
public String toString() {
return super.toString()+": "+messages.length+" msgs, "+
@@ -23,8 +26,12 @@
* Create a PeerPacket
* @throws IOException if the link is already closed
*/
- PeerPacket(PeerPacketMessage[] msgs, Link l, Presentation p)
+ PeerPacket(PeerPacketMessage[] msgs, Link l, Presentation p,
+ boolean isInRT)
throws IllegalArgumentException, IOException {
+ this.isInRT = isInRT;
+ int prio = freenet.transport.WriteSelectorLoop.MESSAGE+1;
+ if(isInRT) prio--;
sentBytes = 0;
messages = msgs;
int totalLength = 0;
@@ -39,6 +46,13 @@
}
if(m.msg.close)
hasCloseMessage = true;
+ if(!(m.msg instanceof QueryRejected))
+ prio--;
+ if(m.msg instanceof Request || m.msg instanceof DataInsert)
+ prio--; // Requests get 2 points
+ if(m.msg instanceof Identify)
+ prio-=4; // IMPORTANT to get the connection ready to go
+ // FIXME: query the messages for their priority?
}
data = new byte[totalLength];
int x = 0;
@@ -47,6 +61,7 @@
System.arraycopy(msgBytes, 0, data, x, msgBytes.length);
x += msgBytes.length;
}
+ priority = prio;
}
public int getLength() {
@@ -74,6 +89,10 @@
public int countMessages() {
return messages.length;
+ }
+
+ public int priority() {
+ return priority;
}
/**
Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Version.java,v
retrieving revision 1.444
retrieving revision 1.445
diff -u -w -r1.444 -r1.445
--- Version.java 16 Oct 2003 01:03:58 -0000 1.444
+++ Version.java 17 Oct 2003 01:43:28 -0000 1.445
@@ -18,7 +18,7 @@
public static String protocolVersion = "1.46";
/** The build number of the current revision */
- public static final int buildNumber = 6251;
+ public static final int buildNumber = 6252;
// 6028: may 3; ARK retrieval fix
public static final int ignoreBuildsAfter = 6500;
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs