Author: gdusbabek Date: Tue Jul 20 16:55:51 2010 New Revision: 965906 URL: http://svn.apache.org/viewvc?rev=965906&view=rev Log: outbound tcp connections were never being shutdown properly, causing occasional dropped messages. patch by gdusbabek, reviewed by jbellis. CASSANDRA-1221
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=965906&r1=965905&r2=965906&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Tue Jul 20 16:55:51 2010 @@ -50,7 +50,7 @@ import java.util.concurrent.ExecutorServ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -public class MessagingService implements IFailureDetectionEventListener +public class MessagingService { private static int version_ = 1; //TODO: make this parameter dynamic somehow. Not sure if config is appropriate. @@ -129,7 +129,7 @@ public class MessagingService implements /** called by failure detection code to notify that housekeeping should be performed on downed sockets. */ public void convict(InetAddress ep) { - logger_.trace("Resetting pool for " + ep); + logger_.debug("Resetting pool for " + ep); getConnectionPool(ep).reset(); } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=965906&r1=965905&r2=965906&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Tue Jul 20 16:55:51 2010 @@ -84,6 +84,9 @@ public class OutboundTcpConnection exten } if (socket != null || connect()) writeConnected(bb); + else + // clear out the queue, else gossip messages back up. + queue.clear(); } } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=965906&r1=965905&r2=965906&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java Tue Jul 20 16:55:51 2010 @@ -874,7 +874,10 @@ public class StorageService implements I deliverHints(endpoint); } - public void onDead(InetAddress endpoint, EndPointState state) {} + public void onDead(InetAddress endpoint, EndPointState state) + { + MessagingService.instance.convict(endpoint); + } /** raw load value */ public double getLoad()