Update of /cvsroot/freenet/freenet/src/freenet
In directory sc8-pr-cvs1:/tmp/cvs-serv23996/src/freenet

Modified Files:
        ConnectionHandler.java PeerHandler.java PeerPacket.java 
        Version.java 
Log Message:
6252
- Change NGRouting discard algorithm: Don't compare by successes if the node you are 
comparing to was not created yet at the time of your last success.
-- Keep original creation time for each node's estimator in the RT. Age is displayed 
on the node page.
- Make PeerPacket calculate message priorities
-- QueryRejected's don't count at all
-- Identify's count for four
-- Request's count for two
-- Being in the RT counts for one
-- We start at prio message+1 i.e. trailer.
- Fix sending of first packet for a while on conns: We were sending single message 
packets even when messages were queued, thus wasting the first packet.
-- Add ConnectionHandler.forceSendPacket() - sends a packet which it gets from the 
peerhandler
- Lose most synchronization on NGRT.route(). Add minimal synchronization at a lower 
level on StandardNodeEstimator, DecayingRunningAverage etc.
- Delete some unused code from ConnectionHandler, most of it already commented out
- Fix KeyCollisionException in StoreIOException error
- Fix bug in maxConnDefault for macos/x
- Catch race in ASL without throwing
- Logging etc


Index: ConnectionHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandler.java,v
retrieving revision 1.185
retrieving revision 1.186
diff -u -w -r1.185 -r1.186
--- ConnectionHandler.java      16 Oct 2003 01:03:57 -0000      1.185
+++ ConnectionHandler.java      17 Oct 2003 01:43:28 -0000      1.186
@@ -418,8 +418,7 @@
                        logDEBUG("Caught "+e+" in getPacket in registerOCM");
                }
                if(sentPacket != null)
-                       innerSendPacket(wsl.MESSAGE - 1 - sentPacket.countMessages(),
-                                                       sentPacket);
+                       innerSendPacket(sentPacket.priority(), sentPacket);
                if(receiveClosed.state() && receivingCount <= 0 &&
                   sendClosed.state() && trailerSendID == -1) {
                        logDEBUG("terminating at end of registerOCM");
@@ -1001,51 +1000,6 @@
        boolean checkingSending = false;
        Object checkingSendingLock = new Object();
        
-       /**
-        * Check that we are in fact reallySending, and if the flag is incorrect,
-        * firstly, correct it, and secondly, log a big fat error message.
-        * Will get false positives if called within *sendMessage*, so don't
-        * use it in OpenConnectionManager.
-        */
-//     public void checkReallySending() {
-//             if(!doCheckReallySending) return;
-//             synchronized(checkingSendingLock) {
-//                     if(checkingSending) return;
-//                     checkingSending = true;
-//             }
-//             try {
-//                     synchronized(sendLock) {
-//                             for(Iterator i = sentMessages.iterator();i.hasNext();) 
{
-//                                     MessageSend ms = (MessageSend)(i.next());
-//                                     if(ms.hasTrailing) {
-//                                             Core.logger.log(this, ms.toString()+
-//                                                                             " on 
sentMessages has trailing for "+
-//                                                                             this, 
Logger.DEBUG);
-//                                             return;
-//                                     }
-//                             }
-//                             for(Iterator i = sendingQueue.iterator();i.hasNext();) 
{
-//                                     MessageSend ms = (MessageSend)(i.next());
-//                                     if(ms.hasTrailing) {
-//                                             Core.logger.log(this, ms.toString()+
-//                                                                             " on 
sendingQueue has trailing for "+
-//                                                                             this, 
Logger.DEBUG);
-//                                             return;
-//                                     }
-//                             }
-//                             if(trailerSendID != -1) {
-//                                     Core.logger.log(this, "We have a sender",
-//                                                                     Logger.DEBUG);
-//                                     return;
-//                             }
-//                             Core.logger.log(this, "TRAILING PRESENT FLAG INCORRECT 
FOR "+this,
-//                                                             Logger.ERROR);
-//                     }
-//             } finally {
-//                     checkingSending = false;
-//             }
-//     }
-       
        public void jobDone(int size, boolean status) {
                lastActiveTime = System.currentTimeMillis();
                boolean logMINOR = Core.logger.shouldLog(Logger.MINOR, this);
@@ -1060,7 +1014,6 @@
                        //tell everybody they failed 
                        //this is where the PeerHandler will really help
                        // Locking!
-                       MessageSend[] msgs = null;
                        if((!sendingTrailerChunk) && (!finalized.state())) {
                                if(sentPacket == null)
                                        Core.logger.log(this, "sentPacket NULL! for 
"+this,
@@ -1217,13 +1170,13 @@
                }
                
                if (mySentPacket != null) {
-                       if(mySentPacket != sentPacket)
+                       if(mySentPacket != sentPacket) {
+                               if(!finalized.state())
                                Core.logger.log(this, "mySentPacket = "+mySentPacket+
                                                                ", but sentPacket = 
"+sentPacket+" ("+
                                                                this+")", 
Logger.ERROR);
-                       else {
-                               innerSendPacket(wsl.MESSAGE + 1 - 
sentPacket.countMessages(),
-                                                               sentPacket);
+                       } else {
+                               innerSendPacket(sentPacket.priority(), sentPacket);
                                int sentPacketLength = mySentPacket.getLength();
                                int sentPacketMessages = mySentPacket.countMessages();
                                if(!sendClosed.state()) {
@@ -2026,195 +1979,31 @@
        long lastArrival = System.currentTimeMillis();
        long lastArrivalNoQR = System.currentTimeMillis();
        
-       public final class MessageSend {
-               final Message m;
-               final RawMessage raw;
-               SendFailedException sfe;
-               final boolean hasTrailing;
-               final byte[] toSend;
-               boolean done = false;
-               boolean success = false;
-               final long startTime;
-               final long startTimeNoQR;
-               long sendStartTime;
-               long sendStartTimeNoQR;
-               MessageSendCallback cb;
-               int priority = wsl.MESSAGE;
-               
-               // Call synchronized(sendLock)
-               MessageSend(Message m, RawMessage raw, byte[] b, 
-                                       MessageSendCallback cb) throws 
SendFailedException {
-                       this.m = m;
-                       this.raw = raw;
-                       this.cb = cb;
-                       sfe = null;
-                       toSend = b;
-                       hasTrailing = (raw.trailingFieldLength > 0);
-                       startTime = System.currentTimeMillis();
-                       Core.diagnostics.occurrenceContinuous
-                               ("messageSendInterarrivalTime", 
-                                System.currentTimeMillis() - lastArrival);
-                       lastArrival = System.currentTimeMillis();
-                       
-                       //FIXME: implement m.doWeCare for more generic counting
-                       if (m.getMessageName().compareTo("QueryRejected") !=0 ) {
-                               Core.diagnostics.occurrenceContinuous
-                                       ("messageSendInterarrivalTimeNoQR", 
-                                        System.currentTimeMillis() - lastArrivalNoQR);
-                               lastArrivalNoQR = System.currentTimeMillis();   
-                               
-                               startTimeNoQR = lastArrivalNoQR;
-                       } else
-                               startTimeNoQR = 0;
-                       
-               }
-               
-               void onStart() {
-                       sendStartTime = System.currentTimeMillis();
-                       if (m.getMessageName().compareTo("QueryRejected") !=0 ) 
-                               sendStartTimeNoQR = sendStartTime;      
-               }
-               
-               void start() throws SendFailedException {
-                       try {
-                               Core.logger.log(this, "Starting to send "+this+" for "+
-                                                               
ConnectionHandler.this, Logger.DEBUG);
-                               sendBytes(toSend, 0, toSend.length, wsl.MESSAGE);
-                               onStart();
-                       } catch (IOException e) {
-                               Core.logger.log(this, "Got IOException trying to 
sendMessage: "+this+" ("+ConnectionHandler.this+")", e, Logger.DEBUG);
-                               //needToRemoveFromOCM = true;
-                               // Keep it in the OCM. It will be terminated 
eventually.
-                               synchronized(sendLock) {
-                                       sendClosed.change(true);
-                                       sendLock.notifyAll();
-                               }
-                               // terminal failure
-                               String s = e.getMessage();
-                               if(s != null) s = s.getClass().getName()+": "+s;
-                               if(s != null) {
-                                       throw new 
SendFailedException(peer.getAddress(), null,s, true);
-                                       //throw sfe;
-                               }
-                               else  {
-                                       throw new 
SendFailedException(peer.getAddress(), true);
-                                       //throw sfe;
-                               }
-                       }
-               }
-               
-               void closed() {
-                       logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
-                       if(logDEBUG) logDEBUG("MessageSend closed(): "+this);
-                       if(!done) jobDone(0, false);
-               }
-               
-               // Override this to get called when done
-               public void notifyDone() {
-                       synchronized(this) {
-                               this.notifyAll();
-                       }
-                       //log the total time in queue
-                       long t = System.currentTimeMillis() - startTime;
-                       if(sfe != null) success = false;
-                       Core.diagnostics.occurrenceBinomial("messageSuccessRatio", 1,
-                                                                                      
         success ? 1 : 0);
-                       if(success) {
-                               
Core.diagnostics.occurrenceContinuous("messageSendTime", t);
-                               if(identity != null && 
Main.node.rt.references(identity))
-                                       Core.diagnostics.
-                                               
occurrenceContinuous("messageSendTimeRT", t);
-                               else
-                                       Core.diagnostics.
-                                               
occurrenceContinuous("messageSendTimeNonRT", t);
-                               Core.logger.log(this, "messageSendTime "+t+" for 
"+this+
-                                                               " for 
"+ConnectionHandler.this,
-                                                               t>20000 ? 
Logger.NORMAL : Logger.MINOR);
-                               if (m.getMessageName().compareTo("QueryRejected") !=0 
) {
-                                       long t2 = System.currentTimeMillis() - 
startTimeNoQR;
-                                       
Core.diagnostics.occurrenceContinuous("messageSendTimeNoQR", t2);       
-                               }
-                               
-                               //log only the time it took to send
-                               if(sendStartTime > 0) {
-                                       t = System.currentTimeMillis() - sendStartTime;
-                                       if (shouldThrottle()){
-                                               
Core.diagnostics.occurrenceContinuous("messageSendServiceTime",t);
-                                               Core.logger.log(this, 
"messageSendServiceTime "+t+" for "+
-                                                                               this+" 
for "+ConnectionHandler.this,
-                                                                               
t>20000 ? Logger.NORMAL : Logger.MINOR);
-                                               if 
(m.getMessageName().compareTo("QueryRejected") !=0 ) 
-                                                       
Core.diagnostics.occurrenceContinuous("messageSendServiceTimeNoQR", t);
-                                       }
-                               }
-                               if(cb != null) cb.succeeded();
-                       } else {
-                               if(sfe == null)
-                                       Core.logger.log(this, "Failed but SFE null!: 
"+this+" ("+
-                                                                       
ConnectionHandler.this+")", Logger.NORMAL);
-                               if(cb != null) cb.thrown(sfe);
-                       }
-                       //System.out.println("notified MessageSend");
-               }
-               
-               void jobDone(int size, boolean status) {
-                       logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
-                       try {
-                               if(size == toSend.length) {
-                                       success = true;
-                                       if(logDEBUG) logDEBUG("Successfully sent 
message "+this);
-                                       messages++;
-                               } else {
-                                       sfe = new 
SendFailedException(peer.getAddress(), true);
-                                       Core.logger.log(this, "Send failed for 
"+this+" ("+
-                                                                       
ConnectionHandler.this+") in jobDone("+
-                                                                       
size+","+status+") for "+toSend.length, sfe,
-                                                                       Logger.MINOR);
-                               }
-                               if(logDEBUG) logDEBUG("MessageSend finished sending 
message "+this);
-                               done = true;
-                       } finally {
-                               // only if there is a trailing field to write do we not
-                               // decrement sendingCount and notify() immediately
-                               if(logDEBUG) logDEBUG("in jobDone finally{} "+this);
+       /** 
+        * Force the ConnectionHandler to start sending a packet
+        * @returns true if we sent a packet, false if we were already sending
+        * one.
+        * @throws nil. Must log an error and return false if it was going to
+        * throw.
+        * Called by PeerHandler.innerSendMessageAsync
+        */
+       public final boolean forceSendPacket(PeerPacketMessage ppm) {
                                try {
-                                       if (!hasTrailing) {
-                                               //sending.decCount(); // done
-                                               synchronized(sendLock) {
-                                                       
-                                                       if(logDEBUG) 
logDEBUG("Receiving "+receivingCount+
-                                                                                      
           " messages, send id: "+
-                                                                                      
           trailerSendID+
-                                                                                      
           ", receiveClosed="+
-                                                                                      
           receiveClosed.state()+
-                                                                                      
           ", sendClosed="+
-                                                                                      
           sendClosed.state()+" ("+
-                                                                                      
           this+")");
-                                                       if (receiveClosed.state() && 
receivingCount == 0
-                                                               && sendClosed.state() 
&& trailerSendID == -1) {
-                                                               if(logDEBUG) 
logDEBUG("terminating");
-                                                               terminate();
-                                                       }
-                                                       sendLock.notifyAll();
-                                                       if(logDEBUG) 
logDEBUG("notified");
-                                               }
-                                       }
-//                                     if (needToRemoveFromOCM) {
-//                                             if(logDEBUG) logDEBUG("Need to remove 
from OCM");
-//                                             // Not synced, this is called directly 
on the WSL thread
-//                                             needToRemoveFromOCM = false;
-//                                             if(logDEBUG) logDEBUG("Removed from 
OCM");
-//                                     }
+                       synchronized(sentPacketLock) {
+                               if(sentPacket != null) return false;
+                               sentPacket = peerHandler.getPacket(link, p, identity, 
ppm, 
+                                                                                      
            false);
+                               if(logDEBUG)
+                                       Core.logger.log(this, "Sending "+sentPacket+" 
on "+this+
+                                                                       
".forceSendPacket()", Logger.DEBUG);
+                       }
+                       if(sentPacket != null)
+                               return innerSendPacket(sentPacket.priority(), 
sentPacket);
+                       else return false;
                                } catch (Throwable t) {
-                                       Core.logger.log(this, "Caught throwable "+t+" 
in "+
-                                                                       
ConnectionHandler.this+"."+this+"."+
-                                                                       "jobDone",t, 
Logger.ERROR);
-                               }
-                               // notify MUST get called no matter what
-                               if(logDEBUG) logDEBUG("Notifying "+this);
-                               notifyDone();
-                               if(logDEBUG) logDEBUG("Notified "+this);
-                       }
+                       Core.logger.log(this, "Caught "+t+" in forceSendPacket", t,
+                                                       Logger.ERROR);
+                       return false;
                }
        }
        
@@ -2256,7 +2045,7 @@
         * Do NOT call while synchronized on sentPacketLock - hence the argument
         * @param sentPacket the packet to send
         */
-       protected void innerSendPacket(int prio, PeerPacket sentPacket) {
+       protected boolean innerSendPacket(int prio, PeerPacket sentPacket) {
                lastActiveTime = System.currentTimeMillis();
                PeerPacket sp = sentPacket;
                byte[] toSend = sp.getBytes();
@@ -2271,6 +2060,7 @@
                        }
                        sentPacket = null;
                        sp.jobDone(true, 0, peer, null);
+                       return false;
                } catch (Throwable t) {
                        Core.logger.log(this, "Caught "+t+ " trying to send packet "+
                                                        sentPacket, t, Logger.ERROR);
@@ -2280,7 +2070,9 @@
                        }
                        sentPacket = null;
                        sp.jobDone(true, 0, peer, null);
+                       return false;
                }
+               return true;
        }
        
 //     boolean needToRemoveFromOCM = false;
@@ -2301,311 +2093,6 @@
                                                                        htl);
        }
        
-//     public MessageSend sendMessageAsync(Message m) 
-//             throws SendFailedException {
-//             return sendMessageAsync(m, null);
-//     }
-       
-//     public MessageSend sendMessageAsync(Message m, MessageSendCallback cb)
-//             throws SendFailedException {
-//             RawMessage raw   = m.toRawMessage(p);
-//             synchronized(this) {
-//                     if (conn == null || trailingPresent)  
-//                             //this should not be set when entering new message
-//                             throw new SendFailedException(peer.getAddress(), 
false);
-//                     return innerSendMessageAsync(m, raw, cb);
-//             }
-//     }
-       
-//     public void unsendMessageAsync(MessageSendCallback cb) {
-//             if(sendClosed.state()) return;
-//             synchronized(sendLock) {
-//                     for(Iterator i=sendingQueue.listIterator(0);i.hasNext();) {
-//                             MessageSend m = (MessageSend)(i.next());
-//                             if(m.cb == cb) {
-//                                     if(m.hasTrailing)
-//                                             trailingPresent = false; // we hope!
-//                                     i.remove();
-//                                     sendingCount--;
-//                                     synchronized(sendingQueueBytesLock) {
-//                                             sendingQueueBytes -= m.toSend.length;
-//                                             if (logDEBUG)
-//                                                     Core.logger.log(this, "Removed 
(E) message of "+
-//                                                                                    
 "size "+m.toSend.length+" to "+
-//                                                                                    
 "sendingQueue: now "+
-//                                                                                    
 sendingQueueBytes+" bytes for "+
-//                                                                                    
 this, Logger.DEBUG);
-//                                     }
-//                                     return; // Hopefully there isn't more than 
one...
-//                             }
-//                     }
-//             }
-//     }
-
-//     protected MessageSend innerSendMessageAsync(Message m, RawMessage raw,
-//                                                                                    
         MessageSendCallback cb) 
-//             //no other way really
-//             throws SendFailedException {
-//             logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
-//             try {
-//                     if (sendClosed.state())
-//                             throw new SendFailedException(peer.getAddress(), 
false);
-// //                  needToRemoveFromOCM = false;
-//                     synchronized(sendLock) {
-//                             // Do not remove sendLock without seeing comments below
-//                             if(trailingPresent)
-//                                     checkReallySending();
-//                             if (sendClosed.state())
-//                             // check again when synced
-//                                     throw new 
SendFailedException(peer.getAddress(), false);
-//                             if(trailerSendID != -1)
-//                                     throw new 
SendFailedException(peer.getAddress(), false);
-//                             MessageSend ms = null;
-//                             try {
-//                                     sendingCount++;
-//                                     if (raw.trailingFieldLength > 0) {
-//                                             trailingPresent=true;
-//                                             if (logDEBUG)Core.logger.log(this, 
"enqueing trailer, queue size " + 
-//                                                                                    
                  sendingQueue.size()+" on "+this,
-//                                                                                    
                  Logger.DEBUG);
-//                                     }
-//                                     if(identity != null && 
-//                                        ocm.needsConnection(identity))
-//                                             
Main.node.scheduleConnectionOpener(identity);
-//                                     //waitForOneSender();
-//                                     // if we were just waiting for the connection 
to become free..
-//                                     if(m == null) return null;
-//                                     if(raw.close) {
-//                                             //needToRemoveFromOCM = true;
-//                                             logDEBUG("Closing because message says 
so");
-//                                             sendClosed.change(true);
-//                                             //} else if (raw.sustain) {
-//                                             //    persist.change(true);
-//                                             // hm, we only want to persist if we 
*receive*
-//                                             // a sustain message, right?
-//                                             // sendingCount = 1 - it will get 
closed after it finishes
-//                                     }
-//                                     if(logDEBUG) logDEBUG(sendingCount + " 
messages in sendqueue");
-                                       
-//                                     if(logDEBUG) logDEBUG("Sending RawMessage: " + 
raw, true);
-                                       
-//                                     lastSizeDone = 0;
-//                                     // WATCHME
-//                                     if (Main.watchme != null) {
-//                                             logWatchme(m, raw);
-//                                     }
-                                       
-//                                     // users are fond of this log entry
-//                                     if (Core.logger.shouldLog(Logger.MINOR,this))
-//                                             Core.logger.log(this,
-//                                                                             
raw.messageType + " @ " +
-//                                                                             
Long.toHexString(m.id())
-//                                                                             + " -> 
" + peer.getAddress(),
-//                                                                             
Logger.MINOR);
-                                       
-//                                     // Now send the message
-//                                     ByteArrayOutputStream bais = new 
ByteArrayOutputStream();
-//                                     Link l = link;
-//                                     if(l == null) return null;
-//                                     OutputStream os = l.makeOutputStream(bais);
-//                                     // We are synchronized on sendLock, so we do 
not need
-//                                     // to lock os
-//                                     try {
-//                                             raw.writeMessage(os);
-//                                             os.flush();
-//                                     } catch (IOException e) {
-//                                             e.printStackTrace();
-//                                             Core.logger.log(this, "Impossible 
exception for "+
-//                                                                             
ConnectionHandler.this+": "+e, e,
-//                                                                             
Logger.ERROR);
-//                                             throw new 
IllegalStateException("Impossible exception!: "+
-//                                                                                    
                         e);
-//                                     }
-//                                     byte[] b = bais.toByteArray();
-//                                     //moved the enqueing after the crypto 
operations
-//                                     //synchronized(sendingQueue) {  //lock the 
entire enqueueing 
-                                       
-//                                     ms = new MessageSend(m, raw, b, cb);
-                                       
-//                                     if(logDEBUG) logDEBUG("Got MessageSend "+ms+": 
"+b.length+" bytes");
-//                                     //}// end of monitor
-//                                     if (sendingQueue.size()==0 && 
sentMessages.size() == 0) {
-//                                             try {
-//                                                     if (logDEBUG) 
-//                                                             
Core.logger.log(this,"executing messageSend "+
-//                                                                                    
         "immediately ("+this+","+ms+")",
-//                                                                                    
         Logger.DEBUG);
-//                                                     sentMessages.add(ms);
-//                                                     sentMessagesCount = 1;
-//                                                     if(m instanceof Identify)
-//                                                             ms.priority = 
wsl.NEGOTIATION-1;
-//                                                     // Very important message!
-                                                       
-//                                                     ms.start();
-//                                             } catch (SendFailedException e) {
-//                                                     Core.logger.log(this, "Got 
IOException trying to "+
-//                                                                                    
 "sendMessage: "+this, e, 
-//                                                                                    
 Logger.MINOR);
-//                                                     ms = null;
-//                                                     throw e;
-//                                             }
-//                                     } else {
-//                                             sendingQueue.add(ms);
-//                                             Core.logger.log(this, "Added (D) 
message of size "+
-//                                                                             
b.length+" to sendingQueue: now "+
-//                                                                             
sendingQueue.size()+" for "+this,
-//                                                                             
Logger.DEBUG);
-//                                             synchronized(sendingQueueBytesLock) {
-//                                                     sendingQueueBytes += b.length;
-//                                                     Core.logger.log(this, "Added 
(D) message of "+
-//                                                                                    
 "size "+b.length+" to "+
-//                                                                                    
 "sendingQueue: now "+
-//                                                                                    
 sendingQueueBytes+
-//                                                                                    
 " bytes for "+this,
-//                                                                                    
 Logger.DEBUG);
-//                                             }
-//                                             if (logDEBUG) 
-//                                                     
Core.logger.log(this,"scheduling ms on queue."+
-//                                                                                    
 "  size now "+sendingQueue.size(),
-//                                                                                    
                           Logger.DEBUG);
-//                                             incSendQueue(b.length + 
raw.trailingFieldLength);
-//                                             if(logDEBUG) logDEBUG("Started 
MessageSend: "+ms);
-//                                     }
-//                                     return ms;
-//                             } finally {
-//                                     // only if there is a trailing field to write 
do we not
-//                                     // decrement sendingCount and notify() 
immediately
-//                                     // Can't possibly have started moving trailing 
if we have an error here
-//                                     if(logDEBUG) logDEBUG("in sendMessageAsync 
finally{}");
-//                                     //sending.decCount(); // done
-//                                     if(ms == null) {
-//                                             --sendingCount;
-//                                             if(logDEBUG) logDEBUG("send == null");
-//                                             if(logDEBUG) logDEBUG("Receiving 
"+receivingCount+
-//                                                              " messages, sending 
"+sendingCount+
-//                                                              " messages, 
receiveClosed="+
-//                                                              
receiveClosed.state()+", sendClosed="+
-//                                                              sendClosed.state()+" 
("+this+")");
-//                                             if (receiveClosed.state() && 
receivingCount == 0
-//                                                     && sendClosed.state() && 
sendingCount == 0) {
-//                                                     if(logDEBUG) 
logDEBUG("terminating");
-//                                                     terminate();
-//                                             }
-//                                             sendLock.notifyAll();
-//                                             if(logDEBUG) logDEBUG("notified");
-//                                     }
-//                             }
-//                     }
-//             } finally {
-//                     tcpConnection c = conn;
-//                     if(c != null) {
-//                             if(c.isClosed()) {
-//                                     terminate();
-//                             }
-// //                          else if (needToRemoveFromOCM) {
-// //                                  if(logDEBUG) logDEBUG("Need to remove from 
OCM");
-// //                                  removeFromOCM();
-// //                                  needToRemoveFromOCM = false;
-// //                                  if(logDEBUG) logDEBUG("Removed from OCM");
-// //                          }
-//                     }
-//             }
-//     }
-       
-//     /** Blocking send of a message
-//      * @return a TrailerWriter for the attached trailing field if there is one, 
null otherwise
-//      */
-//     public TrailerWriter sendMessage(Message m) 
-//             throws SendFailedException {
-//             MessageSend ms;
-//             RawMessage raw;
-//             long startedsend=0;
-//             TrailerWriter send = null;
-//             synchronized(this) { //extend locking even further
-//                     if (conn == null || trailingPresent)  //this should not be set 
when entering new message
-//                             throw new SendFailedException(peer.getAddress(), 
false);
-//                     if(logDEBUG) logDEBUG("SendMessage("+m+")", true);
-//                     raw   = m.toRawMessage(p);
-//                     if(logDEBUG) logDEBUG("raw is "+raw);
-//                     ms = innerSendMessageAsync(m, raw, null);
-//                     if(logDEBUG) logDEBUG("sendMessageAsync called");
-//             }
-//             synchronized(ms) {
-//                     while(!ms.done) {
-//                             try {
-//                                     if(logDEBUG) logDEBUG("Waiting for MessageSend 
"+ms);
-//                                     startedsend=System.currentTimeMillis();
-//                                     ms.wait(5*60*1000); //FIXME:hardcoded 5 minutes
-//                             } catch (InterruptedException e) {
-//                                     throw new 
SendFailedException(peer.getAddress(), true);
-//                             };
-//                             if (System.currentTimeMillis() - startedsend >= 
5*60*1000)
-//                                     throw new 
SendFailedException(peer.getAddress(), true);
-//                     }
-//                     if(logDEBUG) logDEBUG("Waited for MessageSend");
-//             }
-//             int sendId = -1;
-//             if(ms.sfe != null) {
-//                     Core.logger.log(this, "MessageSend produced 
SendFailedException "+
-//                                                     " for "+this+": "+ms.sfe, 
ms.sfe, Logger.MINOR);
-//                     throw ms.sfe;
-//             }
-//             boolean knowAboutTrailing = false;
-//             if(ms.success) {
-//                     if(logDEBUG) logDEBUG("Success!");
-//                     try {
-//                             if (ms.hasTrailing) {
-//                                     // has trailing
-//                                     knowAboutTrailing = true;
-//                                     if(logDEBUG) logDEBUG("Has trailing");
-// //                                  OutputStream os = new CHOutputStream();
-//                                     Link l = link;
-//                                     if(l == null) throw new IOException("Closed in 
middle of send");
-                                       
-// //                                  os = l.makeOutputStream(os);
-//                                     synchronized(trailerSendIDCounterLock) {
-//                                             trailerSendIDCounter++;
-//                                             if(trailerSendIDCounter < 0) 
trailerSendIDCounter = 0;
-//                                             sendId = trailerSendIDCounter;
-//                                     }
-//                                     synchronized(trailerSendLock) {
-//                                             trailerSendID = sendId;
-//                                             trailerSendLength = 
ms.raw.trailingFieldLength;
-//                                             trailerSentBytes = 0;
-//                                     }
-//                                     send = new MyTrailerWriter(sendId);
-//                                     if(logDEBUG) logDEBUG("Started 
MyTrailerWriter("+sendId+"): "+send);
-//                             } else {
-//                                     if(logDEBUG) logDEBUG("Does not have 
trailing");
-//                                     // does not
-//                                     //Core.logger.log(this, "Message sent.", 
Logger.DEBUG);
-//                                     Core.randSource.acceptTimerEntropy(sendTimer);
-//                                     lastActiveTime = System.currentTimeMillis();
-//                             } 
-//                     } catch (IOException e) {
-//                             Core.logger.log(this, "Got IOException trying to 
sendMessage: "+this, e, Logger.DEBUG);
-//                             tcpConnection c = conn;
-//                             if(c != null) {
-//                                     if(c.isClosed()) {
-//                                             terminate();
-//                                     }//  else {
-// //                                          removeFromOCM();
-// //                                  }
-//                             } // else already closed
-//                             synchronized(sendClosed) {
-//                                     sendClosed.change(true);
-//                             }
-//                             // terminal failure
-//                             throw new SendFailedException(peer.getAddress(), true);
-//                     }
-//             }
-//             if(logDEBUG) logDEBUG("Returning "+send);
-//             return send;
-//     }
-       
-
-
-
     // IMPORTANT: Don't call this while holding sendLock
     //            or receiveLock or you will cause a
     //            deadlock.
@@ -2650,7 +2137,7 @@
                                sendingCloseMessage = false;
                        }
                }
-               innerSendPacket(wsl.MESSAGE, sentPacket);
+               innerSendPacket(sentPacket.priority(), sentPacket);
                if(logDEBUG)
                        logDEBUG("Sent close packet");
     }
@@ -3248,8 +2735,8 @@
 //     }
        
        public String toString() {
-               return super.toString()+" for "+conn+","+link+", sending 
"+sentPacket+":"+
-                       trailerSendID;
+               return super.toString()+" for "+conn+","+link+", sending "+sentPacket+
+                       ":"+trailerSendID+" for "+peerHandler;
        }
 }
 

Index: PeerHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerHandler.java,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -w -r1.22 -r1.23
--- PeerHandler.java    16 Oct 2003 01:03:58 -0000      1.22
+++ PeerHandler.java    17 Oct 2003 01:43:28 -0000      1.23
@@ -46,6 +46,7 @@
        int totalOutboundFailures = 0;
        
        boolean logDEBUG;
+       boolean inRT = false;
     
     int maxPacketSize;
        
@@ -188,7 +189,7 @@
        public NodeReference updateReference(NodeReference nr) {
                if(logDEBUG)
                        Core.logger.log(this, "updateReference("+nr+") on "+this,
-                                                       Logger.DEBUG);
+                                                       new Exception("debug"), 
Logger.DEBUG);
                if(nr.supersedes(ref)) {
                        if(logDEBUG)
                                Core.logger.log(this, "superceded old ref "+ref+" on 
"+this,
@@ -221,7 +222,7 @@
                boolean inRT = false;
                if(weak) {
                        quitNow = messages.isEmpty() && messagesWithTrailers.isEmpty() 
&&
-                               (id == null || (!(inRT = node.rt.references(id))));
+                               !(inRT = !(id == null || !node.rt.references(id)));
                        if(quitNow && connectionHandlers.isEmpty()) {
                                if(Core.logger.shouldLog(Logger.DEBUG, this))
                                        Core.logger.log(this, "returning false 
immediately",
@@ -343,6 +344,7 @@
                messagesWithTrailers = new LinkedList();
                connectionHandlers = new LinkedList();
                logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
+               inRT = (id == null || !(node.rt.references(id)));
     }
        
        public long timeSinceLastMessageSent() {
@@ -372,6 +374,7 @@
        }
        
        protected void innerSendMessageAsync(PeerPacketMessage pm) {
+               inRT = !(id == null || !node.rt.references(id));
                /* Possibilities:
                 *
                 * 1. No ConnectionHandlers (without trailing fields) open.
@@ -394,6 +397,8 @@
                                                Logger.DEBUG);
                lastMessageSentTime = System.currentTimeMillis();
                int handlersSendingPackets = 0;
+               while(true) {
+                       ConnectionHandler freeConn = null;
                synchronized(connectionHandlers) {
                        for(Iterator e = connectionHandlers.listIterator(0);
                                e.hasNext();) {
@@ -402,8 +407,9 @@
                                if(!ch.isOpen()) {
                                        // Can't send messages
                                        if(logDEBUG) Core.logger.log(this, 
ch.toString()+
-                                                                       " can't send 
messages for "+pm+" on "+
-                                                                       this, 
Logger.DEBUG);
+                                                                                      
                  " can't send messages for "
+                                                                                      
                  +pm+" on "+this,
+                                                                                      
                  Logger.DEBUG);
                                        e.remove();
                                        continue;
                                }
@@ -422,16 +428,25 @@
                                }
                                // Not busy!
                                if(logDEBUG) Core.logger.log(this, ch.toString()+
-                                                               ": send one packet 
"+pm+" for "+this,
+                                                                                      
          ": send one packet "+pm+
+                                                                                      
          " for "+this,
                                                                Logger.DEBUG);
+                                       freeConn = ch;
+                                       break;
+                               }
+                       }
+                       if(freeConn != null) {
                                try {
-                                       if(sendSinglePacket(ch, pm)) return;
+                                       sendSinglePacket(freeConn, pm);
+                                       if(messages.isEmpty() && 
messagesWithTrailers.isEmpty())
+                                               return; // Otherwise there is more to 
send on another conn - if we can find one
                                } catch (IOException ex) {
-                                       Core.logger.log(this, ch.toString()+
+                                       Core.logger.log(this, freeConn.toString()+
                                                                        ": caught 
"+ex+" trying to send packet ("+
                                                                        this+")", 
Logger.MINOR);
                                }
                        }
+                       break;
                }
                // No suitable ConnectionHandlers found!
                // Queue the message
@@ -494,18 +509,23 @@
         * message on the connection because of a locking conflict
      */
     protected boolean sendSinglePacket(ConnectionHandler ch, 
-                                                                          
PeerPacketMessage pm) 
+                                                                          
PeerPacketMessage ppm) 
                throws IOException {
-               PeerPacketMessage[] msgs = new PeerPacketMessage[] { pm };
-               PeerPacket packet = new PeerPacket(msgs, ch.getLink(),
-                                                                                  
ch.presentationType());
-               return ch.sendPacket(packet, 
-                                                        
freenet.transport.WriteSelectorLoop.MESSAGE);
+               return ch.forceSendPacket(ppm);
     }
     
        public final PeerPacket getPacket(Link link, Presentation p) 
                throws IOException {
-               return getPacket(link, p, null, null, false);
+               return getPacket(link, p, null, (PeerPacketMessage)null, false);
+       }
+       
+       public PeerPacket getPacket(Link link, Presentation p,
+                                                               Identity i, Message m,
+                                                               boolean onlyGivenMsg) 
+               throws IOException {
+               return getPacket(link, p, i, 
+                                                new PeerPacketMessage(i, m, null, 
NORMAL, this),
+                                                onlyGivenMsg);
        }
        
     /**
@@ -520,7 +540,7 @@
         * @throws IOException if the connection is already closed
      */
     public PeerPacket getPacket(Link link, Presentation p,
-                                                               Identity i, Message m,
+                                                               Identity i, 
PeerPacketMessage m,
                                                                boolean onlyGivenMsg) 
                throws IOException {
                logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);  
@@ -538,9 +558,11 @@
                LinkedList packetMessages = new LinkedList();
                if(logDEBUG)
                        Core.logger.log(this, "getPacket("+link+","+p+","+i+","+m+","+
-                                                       onlyGivenMsg+" on "+this, 
Logger.DEBUG);
+                                                       onlyGivenMsg+" on "+this+" 
("+messages.size()+
+                                                       " messages, 
"+messagesWithTrailers.size()+
+                                                       " messages with trailers", 
Logger.DEBUG);
                if(m != null) {
-                       packetMessages.add(new PeerPacketMessage(i, m, null, NORMAL, 
this));
+                       packetMessages.add(m);
                }
                if(!onlyGivenMsg) {
                        boolean msgsEmpty = false;
@@ -590,7 +612,7 @@
                        PeerPacketMessage[] msgs = 
                                new PeerPacketMessage[packetMessages.size()];
                        packetMessages.toArray(msgs);
-                       PeerPacket packet = new PeerPacket(msgs, link, p);
+                       PeerPacket packet = new PeerPacket(msgs, link, p, inRT);
                        if(logDEBUG)
                                Core.logger.log(this, "Returning "+packet+" from 
getPacket ("+
                                                                this+")", 
Logger.DEBUG);

Index: PeerPacket.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacket.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -w -r1.9 -r1.10
--- PeerPacket.java     11 Oct 2003 20:00:14 -0000      1.9
+++ PeerPacket.java     17 Oct 2003 01:43:28 -0000      1.10
@@ -3,6 +3,7 @@
 import java.io.OutputStream;
 import java.io.IOException;
 import freenet.Core;
+import freenet.message.*;
 import freenet.support.Logger;
 
 class PeerPacket {
@@ -12,6 +13,8 @@
     int sentBytes;
     boolean hasTrailer = false;
     boolean hasCloseMessage = false;
+    int priority;
+    boolean isInRT;
     
     public String toString() {
        return super.toString()+": "+messages.length+" msgs, "+
@@ -23,8 +26,12 @@
      * Create a PeerPacket
      * @throws IOException if the link is already closed
      */
-    PeerPacket(PeerPacketMessage[] msgs, Link l, Presentation p)
+    PeerPacket(PeerPacketMessage[] msgs, Link l, Presentation p,
+              boolean isInRT)
        throws IllegalArgumentException, IOException {
+       this.isInRT = isInRT;
+       int prio = freenet.transport.WriteSelectorLoop.MESSAGE+1;
+       if(isInRT) prio--;
        sentBytes = 0;
        messages = msgs;
        int totalLength = 0;
@@ -39,6 +46,13 @@
            }
            if(m.msg.close)
                hasCloseMessage = true;
+           if(!(m.msg instanceof QueryRejected))
+               prio--;
+           if(m.msg instanceof Request || m.msg instanceof DataInsert)
+               prio--; // Requests get 2 points
+           if(m.msg instanceof Identify)
+               prio-=4; // IMPORTANT to get the connection ready to go
+           // FIXME: query the messages for their priority?
        }
        data = new byte[totalLength];
        int x = 0;
@@ -47,6 +61,7 @@
            System.arraycopy(msgBytes, 0, data, x, msgBytes.length);
            x += msgBytes.length;
        }
+       priority = prio;
     }
     
     public int getLength() {
@@ -74,6 +89,10 @@
     
     public int countMessages() {
        return messages.length;
+    }
+    
+    public int priority() {
+       return priority;
     }
     
     /**

Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Version.java,v
retrieving revision 1.444
retrieving revision 1.445
diff -u -w -r1.444 -r1.445
--- Version.java        16 Oct 2003 01:03:58 -0000      1.444
+++ Version.java        17 Oct 2003 01:43:28 -0000      1.445
@@ -18,7 +18,7 @@
     public static String protocolVersion = "1.46";
     
     /** The build number of the current revision */
-    public static final int buildNumber = 6251;
+    public static final int buildNumber = 6252;
     // 6028: may 3; ARK retrieval fix
 
     public static final int ignoreBuildsAfter = 6500;

_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to