Author: gdusbabek
Date: Tue Jul 20 16:55:51 2010
New Revision: 965906

URL: http://svn.apache.org/viewvc?rev=965906&view=rev
Log:
outbound tcp connections were never being shutdown properly, causing occasional 
dropped messages. patch by gdusbabek, reviewed by jbellis. CASSANDRA-1221

Modified:
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=965906&r1=965905&r2=965906&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
 Tue Jul 20 16:55:51 2010
@@ -50,7 +50,7 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-public class MessagingService implements IFailureDetectionEventListener
+public class MessagingService
 {
     private static int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is 
appropriate.
@@ -129,7 +129,7 @@ public class MessagingService implements
     /** called by failure detection code to notify that housekeeping should be 
performed on downed sockets. */
     public void convict(InetAddress ep)
     {
-        logger_.trace("Resetting pool for " + ep);
+        logger_.debug("Resetting pool for " + ep);
         getConnectionPool(ep).reset();
     }
 

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=965906&r1=965905&r2=965906&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
 Tue Jul 20 16:55:51 2010
@@ -84,6 +84,9 @@ public class OutboundTcpConnection exten
             }
             if (socket != null || connect())
                 writeConnected(bb);
+            else
+                // clear out the queue, else gossip messages back up.
+                queue.clear();            
         }
     }
 

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=965906&r1=965905&r2=965906&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
 Tue Jul 20 16:55:51 2010
@@ -874,7 +874,10 @@ public class StorageService implements I
             deliverHints(endpoint);
     }
 
-    public void onDead(InetAddress endpoint, EndPointState state) {}
+    public void onDead(InetAddress endpoint, EndPointState state) 
+    {
+        MessagingService.instance.convict(endpoint);
+    }
 
     /** raw load value */
     public double getLoad()


Reply via email to