Author: jbellis
Date: Tue Jan 11 23:39:33 2011
New Revision: 1057932

URL: http://svn.apache.org/viewvc?rev=1057932&view=rev
Log:
backport CASSANDRA-1959 from 0.7

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1057932&r1=1057931&r2=1057932&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Tue Jan 11 23:39:33 2011
@@ -1,6 +1,8 @@
 0.6.10
-  * buffer network stack to avoid inefficient small TCP messages while avoiding
-    the nagle/delayed ack problem (CASSANDRA-1896)
+ * buffer network stack to avoid inefficient small TCP messages while avoiding
+   the nagle/delayed ack problem (CASSANDRA-1896)
+ * fix race condition in MessagingService.targets (CASSANDRA-1959)
+
 
 0.6.9
  * add clustertool, config-converter, sstablekeys, and schematool 

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=1057932&r1=1057931&r2=1057932&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
 Tue Jan 11 23:39:33 2011
@@ -29,6 +29,7 @@ import java.nio.channels.AsynchronousClo
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
 import java.util.*;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -53,6 +54,7 @@ import org.apache.cassandra.utils.Expiri
 import org.apache.cassandra.utils.GuidGenerator;
 import org.apache.cassandra.utils.SimpleCondition;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 public class MessagingService implements ILatencyPublisher
 {
@@ -65,7 +67,7 @@ public class MessagingService implements
 
     /* This records all the results mapped by message Id */
     private static ExpiringMap<String, IMessageCallback> callbacks;
-    private static Multimap<String, InetAddress> targets;
+    private static ConcurrentMap<String, Collection<InetAddress>> targets = 
new NonBlockingHashMap<String, Collection<InetAddress>>();
 
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
@@ -109,7 +111,7 @@ public class MessagingService implements
         {
             public Object apply(String messageId)
             {
-                Collection<InetAddress> addresses = 
targets.removeAll(messageId);
+                Collection<InetAddress> addresses = targets.remove(messageId);
                 if (addresses == null)
                     return null;
 
@@ -122,7 +124,6 @@ public class MessagingService implements
                 return null;
             }
         };
-        targets = ArrayListMultimap.create();
         callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 * 
DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
 
         defaultExecutor_ = new 
JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL");
@@ -240,12 +241,33 @@ public class MessagingService implements
         addCallback(cb, messageId);
         for (InetAddress endpoint : to)
         {
-            targets.put(messageId, endpoint);
+            putTarget(messageId, endpoint);
             sendOneWay(message, endpoint);
         }
         return messageId;
     }
 
+    private static void putTarget(String messageId, InetAddress endpoint)
+    {
+        Collection<InetAddress> addresses = targets.get(messageId);
+        if (addresses == null)
+        {
+            addresses = new NonBlockingHashSet<InetAddress>();
+            Collection<InetAddress> oldAddresses = 
targets.putIfAbsent(messageId, addresses);
+            if (oldAddresses != null)
+                addresses = oldAddresses;
+        }
+        addresses.add(endpoint);
+    }
+
+    private static void removeTarget(String messageId, InetAddress from)
+    {
+        Collection<InetAddress> addresses = targets.get(messageId);
+        // null is expected if we removed the callback or we got a reply after 
its timeout expired
+        if (addresses != null)
+            addresses.remove(from);
+    }
+
     public void addCallback(IAsyncCallback cb, String messageId)
     {
         callbacks.put(messageId, cb);
@@ -265,7 +287,7 @@ public class MessagingService implements
     {        
         String messageId = message.getMessageId();
         addCallback(cb, messageId);
-        targets.put(messageId, to);
+        putTarget(messageId, to);
         sendOneWay(message, to);
         return messageId;
     }
@@ -294,7 +316,7 @@ public class MessagingService implements
         for ( int i = 0; i < messages.length; ++i )
         {
             messages[i].setMessageId(groupId);
-            targets.put(groupId, to[i]);
+            putTarget(groupId, to[i]);
             sendOneWay(messages[i], to[i]);
         }
         return groupId;
@@ -348,7 +370,7 @@ public class MessagingService implements
     {
         IAsyncResult iar = new AsyncResult();
         callbacks.put(message.getMessageId(), iar);
-        targets.put(message.getMessageId(), to);
+        putTarget(message.getMessageId(), to);
         sendOneWay(message, to);
         return iar;
     }
@@ -429,7 +451,7 @@ public class MessagingService implements
     
     public static IMessageCallback removeRegisteredCallback(String messageId)
     {
-        targets.removeAll(messageId); // TODO fix this when we clean up quorum 
reads to do proper RR
+        targets.remove(messageId); // TODO fix this when we clean up quorum 
reads to do proper RR
         return callbacks.remove(messageId);
     }
 
@@ -440,7 +462,7 @@ public class MessagingService implements
 
     public static void responseReceivedFrom(String messageId, InetAddress from)
     {
-        targets.remove(messageId, from);
+        removeTarget(messageId, from);
     }
 
     public static void validateMagic(int magic) throws IOException


Reply via email to