Repository: cassandra
Updated Branches:
  refs/heads/trunk 9a2cae69c -> d30614428


Unregister FD when all validations are done

patch by yukim; reviewed by jbellis for CASSANDRA-9145


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d3061442
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d3061442
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d3061442

Branch: refs/heads/trunk
Commit: d306144282595be6818fa386a3fbb4aece040884
Parents: 9a2cae6
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue May 12 14:12:07 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue May 12 14:12:07 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                            |  2 +-
 src/java/org/apache/cassandra/repair/RepairJob.java    |  6 ------
 .../org/apache/cassandra/repair/RepairSession.java     | 13 +++++++++++++
 3 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d3061442/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ed62eed..e4956c9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -93,7 +93,7 @@
  * Use unsafe mutations for most unit tests (CASSANDRA-6969)
  * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
  * Fail on very large batch sizes (CASSANDRA-8011)
- * Improve concurrency of repair (CASSANDRA-6455, 8208)
+ * Improve concurrency of repair (CASSANDRA-6455, 8208, 9145)
  * Select optimal CRC32 implementation at runtime (CASSANDRA-8614)
  * Evaluate MurmurHash of Token once per query (CASSANDRA-7096)
  * Generalize progress reporting (CASSANDRA-8901)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d3061442/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index 850a894..754e26f 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
@@ -115,11 +114,6 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
         {
             public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> 
trees) throws Exception
             {
-                // Unregister from FailureDetector once we've completed 
synchronizing Merkle trees.
-                // After this point, we rely on tcp_keepalive for individual 
sockets to notify us when a connection is down.
-                // See CASSANDRA-3569
-                
FailureDetector.instance.unregisterFailureDetectionEventListener(session);
-
                 InetAddress local = FBUtilities.getLocalAddress();
 
                 List<SyncTask> syncTasks = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d3061442/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 70bfaa6..a2dcdd1 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -23,6 +23,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.*;
@@ -91,6 +92,9 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
     public final Set<InetAddress> endpoints;
     private final long repairedAt;
 
+    // number of validations left to be performed
+    private final AtomicInteger validationRemaining;
+
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
     // Each validation task waits response from replica in validating 
ConcurrentMap (keyed by CF name and endpoint address)
@@ -134,6 +138,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         this.range = range;
         this.endpoints = endpoints;
         this.repairedAt = repairedAt;
+        this.validationRemaining = new AtomicInteger(cfnames.length);
     }
 
     public UUID getId()
@@ -176,6 +181,14 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         logger.info("[repair #{}] {}", getId(), message);
         Tracing.traceRepair(message);
         task.treeReceived(tree);
+
+        // Unregister from FailureDetector once we've completed synchronizing 
Merkle trees.
+        // After this point, we rely on tcp_keepalive for individual sockets 
to notify us when a connection is down.
+        // See CASSANDRA-3569
+        if (validationRemaining.decrementAndGet() == 0)
+        {
+            
FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+        }
     }
 
     /**

Reply via email to