Ensure atomicity inside thrift and stream session patch by Paulo Motta; reviewed by yukim for CASSANDRA-7757
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a4728f6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a4728f6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a4728f6 Branch: refs/heads/cassandra-2.2 Commit: 0a4728f62b51095706bf7155e8f60b39ec5fa082 Parents: dce303b Author: Paulo Motta <pauloricard...@gmail.com> Authored: Wed Jul 29 19:52:40 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Jul 29 19:53:07 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/streaming/StreamSession.java | 9 ++++++--- .../org/apache/cassandra/thrift/ThriftSessionManager.java | 9 ++++++--- 3 files changed, 13 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a4728f6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4ef77ed..5cfc347 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ config file or environment variables (CASSANDRA-9544) * Remove repair snapshot leftover on startup (CASSANDRA-7357) * Use random nodes for batch log when only 2 racks (CASSANDRA-8735) + * Ensure atomicity inside thrift and stream session (CASSANDRA-7757) Merged from 2.0: * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793) * checkForEndpointCollision fails for legitimate collisions (CASSANDRA-9765) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a4728f6/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 1edfedb..63219d8 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -137,7 +137,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber // stream requests to send to the peer private final Set<StreamRequest> requests = Sets.newConcurrentHashSet(); // streaming tasks are created and managed per ColumnFamily ID - private final Map<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>(); // data receivers, filled after receiving prepare message private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<>(); private final StreamingMetrics metrics; @@ -369,8 +369,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber StreamTransferTask task = transfers.get(cfId); if (task == null) { - task = new StreamTransferTask(this, cfId); - transfers.put(cfId, task); + //guarantee atomicity + StreamTransferTask newTask = new StreamTransferTask(this, cfId); + task = transfers.putIfAbsent(cfId, newTask); + if (task == null) + task = newTask; } task.addTransferFile(details.ref, details.estimatedKeys, details.sections, details.repairedAt); iter.remove(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a4728f6/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java index ed3df6d..6caa558 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java +++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java @@ -36,7 +36,7 @@ public class ThriftSessionManager public final static ThriftSessionManager instance = new ThriftSessionManager(); private final ThreadLocal<SocketAddress> remoteSocket = new ThreadLocal<>(); - private final Map<SocketAddress, ThriftClientState> activeSocketSessions = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<SocketAddress, ThriftClientState> activeSocketSessions = new ConcurrentHashMap<>(); /** * @param socket the address on which the current thread will work on requests for until further notice @@ -57,8 +57,11 @@ public class ThriftSessionManager ThriftClientState cState = activeSocketSessions.get(socket); if (cState == null) { - cState = new ThriftClientState(socket); - activeSocketSessions.put(socket, cState); + //guarantee atomicity + ThriftClientState newState = new ThriftClientState(socket); + cState = activeSocketSessions.putIfAbsent(socket, newState); + if (cState == null) + cState = newState; } return cState; }