Repository: cassandra Updated Branches: refs/heads/trunk 9a2cae69c -> d30614428
Unregister FD when all validations are done patch by yukim; reviewed by jbellis for CASSANDRA-9145 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d3061442 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d3061442 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d3061442 Branch: refs/heads/trunk Commit: d306144282595be6818fa386a3fbb4aece040884 Parents: 9a2cae6 Author: Yuki Morishita <yu...@apache.org> Authored: Tue May 12 14:12:07 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue May 12 14:12:07 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 +- src/java/org/apache/cassandra/repair/RepairJob.java | 6 ------ .../org/apache/cassandra/repair/RepairSession.java | 13 +++++++++++++ 3 files changed, 14 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d3061442/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ed62eed..e4956c9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -93,7 +93,7 @@ * Use unsafe mutations for most unit tests (CASSANDRA-6969) * Fix race condition during calculation of pending ranges (CASSANDRA-7390) * Fail on very large batch sizes (CASSANDRA-8011) - * Improve concurrency of repair (CASSANDRA-6455, 8208) + * Improve concurrency of repair (CASSANDRA-6455, 8208, 9145) * Select optimal CRC32 implementation at runtime (CASSANDRA-8614) * Evaluate MurmurHash of Token once per query (CASSANDRA-7096) * Generalize progress reporting (CASSANDRA-8901) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d3061442/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 850a894..754e26f 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -115,11 +114,6 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable { public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> trees) throws Exception { - // Unregister from FailureDetector once we've completed synchronizing Merkle trees. - // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down. - // See CASSANDRA-3569 - FailureDetector.instance.unregisterFailureDetectionEventListener(session); - InetAddress local = FBUtilities.getLocalAddress(); List<SyncTask> syncTasks = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d3061442/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 70bfaa6..a2dcdd1 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -23,6 +23,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Lists; import com.google.common.util.concurrent.*; @@ -91,6 +92,9 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement public final Set<InetAddress> endpoints; private final long repairedAt; + // number of validations left to be performed + private final AtomicInteger validationRemaining; + private final AtomicBoolean isFailed = new AtomicBoolean(false); // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address) @@ -134,6 +138,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement this.range = range; this.endpoints = endpoints; this.repairedAt = repairedAt; + this.validationRemaining = new AtomicInteger(cfnames.length); } public UUID getId() @@ -176,6 +181,14 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement logger.info("[repair #{}] {}", getId(), message); Tracing.traceRepair(message); task.treeReceived(tree); + + // Unregister from FailureDetector once we've completed synchronizing Merkle trees. + // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down. + // See CASSANDRA-3569 + if (validationRemaining.decrementAndGet() == 0) + { + FailureDetector.instance.unregisterFailureDetectionEventListener(this); + } } /**