Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 4c1597011 -> 9966419db refs/heads/cassandra-2.2 8b021db7c -> dee675f1e refs/heads/trunk 4c4c4327a -> 43d21c384
Make rebuild only run one at a time patch by yukim; reviewed by jmckenzie for CASSANDRA-9119 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9966419d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9966419d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9966419d Branch: refs/heads/cassandra-2.1 Commit: 9966419dbda995421f41ccc769d3b89d63940c82 Parents: 4c15970 Author: Yuki Morishita <yu...@apache.org> Authored: Wed Jun 3 14:44:11 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Jun 17 20:41:16 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/StorageService.java | 36 ++++++++++++++------ 2 files changed, 27 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9966419d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8f3f9f0..1d72c9a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,7 @@ Merged from 2.0 * ArrivalWindow should use primitives (CASSANDRA-9496) * Periodically submit background compaction tasks (CASSANDRA-9592) * Set HAS_MORE_PAGES flag to false when PagingState is null (CASSANDRA-9571) + * Make rebuild only run one at a time (CASSANDRA-9119) 2.1.6 http://git-wip-us.apache.org/repos/asf/cassandra/blob/9966419d/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 7c8e424..e063c63 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -50,6 +50,7 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -237,7 +238,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private boolean isBootstrapMode; /* we bootstrap but do NOT join the ring unless told to do so */ - private boolean isSurveyMode= Boolean.parseBoolean(System.getProperty("cassandra.write_survey", "false")); + private boolean isSurveyMode = Boolean.parseBoolean(System.getProperty("cassandra.write_survey", "false")); + /* true if node is rebuilding and receiving data */ + private final AtomicBoolean isRebuilding = new AtomicBoolean(); /* when intialized as a client, we shouldn't write to the system keyspace. */ private boolean isClientMode; @@ -1023,19 +1026,27 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void rebuild(String sourceDc) { - logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc); - - RangeStreamer streamer = new RangeStreamer(tokenMetadata, FBUtilities.getBroadcastAddress(), "Rebuild"); - streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); - if (sourceDc != null) - streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc)); + // check on going rebuild + if (!isRebuilding.compareAndSet(false, true)) + { + throw new IllegalStateException("Node is still rebuilding. Check nodetool netstats."); + } - for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) - streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName)); + logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc); try { - streamer.fetchAsync().get(); + RangeStreamer streamer = new RangeStreamer(tokenMetadata, FBUtilities.getBroadcastAddress(), "Rebuild"); + streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); + if (sourceDc != null) + streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc)); + + for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) + streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName)); + + StreamResultFuture resultFuture = streamer.fetchAsync(); + // wait for result + resultFuture.get(); } catch (InterruptedException e) { @@ -1047,6 +1058,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.error("Error while rebuilding node", e.getCause()); throw new RuntimeException("Error while rebuilding node: " + e.getCause().getMessage()); } + finally + { + // rebuild is done (successfully or not) + isRebuilding.set(false); + } } public void setStreamThroughputMbPerSec(int value)