Ensure atomicity inside thrift and stream session

patch by Paulo Motta; reviewed by yukim for CASSANDRA-7757


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a4728f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a4728f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a4728f6

Branch: refs/heads/cassandra-2.2
Commit: 0a4728f62b51095706bf7155e8f60b39ec5fa082
Parents: dce303b
Author: Paulo Motta <pauloricard...@gmail.com>
Authored: Wed Jul 29 19:52:40 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 29 19:53:07 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                                 | 1 +
 src/java/org/apache/cassandra/streaming/StreamSession.java  | 9 ++++++---
 .../org/apache/cassandra/thrift/ThriftSessionManager.java   | 9 ++++++---
 3 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a4728f6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4ef77ed..5cfc347 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
    config file or environment variables (CASSANDRA-9544)
  * Remove repair snapshot leftover on startup (CASSANDRA-7357)
  * Use random nodes for batch log when only 2 racks (CASSANDRA-8735)
+ * Ensure atomicity inside thrift and stream session (CASSANDRA-7757)
 Merged from 2.0:
  * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793)
  * checkForEndpointCollision fails for legitimate collisions (CASSANDRA-9765)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a4728f6/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 1edfedb..63219d8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -137,7 +137,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     // stream requests to send to the peer
     private final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
     // streaming tasks are created and managed per ColumnFamily ID
-    private final Map<UUID, StreamTransferTask> transfers = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new 
ConcurrentHashMap<>();
     // data receivers, filled after receiving prepare message
     private final Map<UUID, StreamReceiveTask> receivers = new 
ConcurrentHashMap<>();
     private final StreamingMetrics metrics;
@@ -369,8 +369,11 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
             StreamTransferTask task = transfers.get(cfId);
             if (task == null)
             {
-                task = new StreamTransferTask(this, cfId);
-                transfers.put(cfId, task);
+                //guarantee atomicity
+                StreamTransferTask newTask = new StreamTransferTask(this, 
cfId);
+                task = transfers.putIfAbsent(cfId, newTask);
+                if (task == null)
+                    task = newTask;
             }
             task.addTransferFile(details.ref, details.estimatedKeys, 
details.sections, details.repairedAt);
             iter.remove();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a4728f6/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java 
b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
index ed3df6d..6caa558 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
@@ -36,7 +36,7 @@ public class ThriftSessionManager
     public final static ThriftSessionManager instance = new 
ThriftSessionManager();
 
     private final ThreadLocal<SocketAddress> remoteSocket = new 
ThreadLocal<>();
-    private final Map<SocketAddress, ThriftClientState> activeSocketSessions = 
new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<SocketAddress, ThriftClientState> 
activeSocketSessions = new ConcurrentHashMap<>();
 
     /**
      * @param socket the address on which the current thread will work on 
requests for until further notice
@@ -57,8 +57,11 @@ public class ThriftSessionManager
         ThriftClientState cState = activeSocketSessions.get(socket);
         if (cState == null)
         {
-            cState = new ThriftClientState(socket);
-            activeSocketSessions.put(socket, cState);
+            //guarantee atomicity
+            ThriftClientState newState = new ThriftClientState(socket);
+            cState = activeSocketSessions.putIfAbsent(socket, newState);
+            if (cState == null)
+                cState = newState;
         }
         return cState;
     }

Reply via email to