Fix incremental repair hang when replica is down patch by yukim; reviewed by marcuse for CASSANDRA-10288
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1538c092 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1538c092 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1538c092 Branch: refs/heads/cassandra-3.1 Commit: 1538c0921444d7969ebd07ca1abda9a7e40e4c73 Parents: 0b26ca6 Author: Yuki Morishita <yu...@apache.org> Authored: Wed Dec 2 08:41:11 2015 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Dec 2 08:41:11 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/repair/AnticompactionTask.java | 29 +++++++++++++------- .../cassandra/service/ActiveRepairService.java | 17 +++++++++--- 3 files changed, 33 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1538c092/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e00abfe..9c5e2d5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.12 + * Fix incremental repair hang when replica is down (CASSANDRA-10288) * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791) * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768) * Add proper error handling to stream receiver (CASSANDRA-10774) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1538c092/src/java/org/apache/cassandra/repair/AnticompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java index f41d26c..8b68fd3 100644 --- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java +++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.repair; +import java.io.IOException; import java.net.InetAddress; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -24,6 +25,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.AbstractFuture; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; @@ -52,25 +54,32 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R public void run() { - AnticompactionRequest acr = new AnticompactionRequest(parentSession); - SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor); - if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0) + if (FailureDetector.instance.isAlive(neighbor)) { - if (doAnticompaction) + AnticompactionRequest acr = new AnticompactionRequest(parentSession); + SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor); + if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0) { - MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true); + if (doAnticompaction) + { + MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true); + } + else + { + // we need to clean up parent session + MessagingService.instance().sendRR(new CleanupMessage(parentSession).createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true); + } } else { - // we need to clean up parent session - MessagingService.instance().sendRR(new CleanupMessage(parentSession).createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true); + MessagingService.instance().sendOneWay(acr.createMessage(), neighbor); + // immediately return after sending request + set(neighbor); } } else { - MessagingService.instance().sendOneWay(acr.createMessage(), neighbor); - // immediately return after sending request - set(neighbor); + setException(new IOException(neighbor + " is down")); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1538c092/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 4266f41..dd80d4c 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -277,11 +277,20 @@ public class ActiveRepairService for (ColumnFamilyStore cfs : columnFamilyStores) cfIds.add(cfs.metadata.cfId); - for(InetAddress neighbour : endpoints) + for (InetAddress neighbour : endpoints) { - PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges); - MessageOut<RepairMessage> msg = message.createMessage(); - MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true); + if (FailureDetector.instance.isAlive(neighbour)) + { + PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges); + MessageOut<RepairMessage> msg = message.createMessage(); + MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true); + } + else + { + status.set(false); + failedNodes.add(neighbour.getHostAddress()); + prepareLatch.countDown(); + } } try {