Repository: cassandra Updated Branches: refs/heads/cassandra-3.1 521cc5425 -> 8feb66e6f
Wait for migration responses to complete before bootstrapping patch by Mike Adamson; reviewed by Sergio Bossa for CASSANDRA-10731 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ae315b5e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ae315b5e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ae315b5e Branch: refs/heads/cassandra-3.1 Commit: ae315b5ec944571342146867c51b2ceb50f3845e Parents: 29b988d Author: Mike Adamson <madam...@datastax.com> Authored: Mon Nov 16 15:48:33 2015 +0000 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Nov 24 23:18:14 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/MigrationManager.java | 24 ++++++++++++++++-- .../apache/cassandra/service/MigrationTask.java | 26 ++++++++++++++++++++ .../cassandra/service/StorageService.java | 9 +++---- 4 files changed, 53 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae315b5e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 608d8f8..116d4c3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.1 + * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731) * Unable to create a function with argument of type Inet (CASSANDRA-10741) * Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717) * Correctly preserve deletion info on updated rows when notifying indexers http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae315b5e/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index b7f9bf3..c0b5b10 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -59,8 +59,10 @@ public class MigrationManager public static final int MIGRATION_DELAY_IN_MS = 60000; + private static final int MIGRATION_TASK_WAIT_IN_SECONDS = Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", "1")); + private final List<MigrationListener> listeners = new CopyOnWriteArrayList<>(); - + private MigrationManager() {} public void register(MigrationListener listener) @@ -148,7 +150,25 @@ public class MigrationManager public static boolean isReadyForBootstrap() { - return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0; + return MigrationTask.getInflightTasks().isEmpty(); + } + + public static void waitUntilReadyForBootstrap() + { + CountDownLatch completionLatch; + while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null) + { + try + { + if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS)) + logger.error("Migration task failed to complete"); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + logger.error("Migration task was interrupted"); + } + } } public void notifyCreateKeyspace(KeyspaceMetadata ksm) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae315b5e/src/java/org/apache/cassandra/service/MigrationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java index 8a1b858..39a5a11 100644 --- a/src/java/org/apache/cassandra/service/MigrationTask.java +++ b/src/java/org/apache/cassandra/service/MigrationTask.java @@ -20,11 +20,17 @@ package org.apache.cassandra.service; import java.io.IOException; import java.net.InetAddress; import java.util.Collection; +import java.util.EnumSet; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.SystemKeyspace.BootstrapState; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.net.IAsyncCallback; @@ -39,6 +45,10 @@ class MigrationTask extends WrappedRunnable { private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class); + private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new ConcurrentLinkedQueue<>(); + + private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS); + private final InetAddress endpoint; MigrationTask(InetAddress endpoint) @@ -46,6 +56,11 @@ class MigrationTask extends WrappedRunnable this.endpoint = endpoint; } + public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks() + { + return inflightTasks; + } + public void runMayThrow() throws Exception { // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(), @@ -65,6 +80,8 @@ class MigrationTask extends WrappedRunnable MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance); + final CountDownLatch completionLatch = new CountDownLatch(1); + IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>() { @Override @@ -78,6 +95,10 @@ class MigrationTask extends WrappedRunnable { logger.error("Configuration exception merging remote schema", e); } + finally + { + completionLatch.countDown(); + } } public boolean isLatencyForSnitch() @@ -85,6 +106,11 @@ class MigrationTask extends WrappedRunnable return false; } }; + + // Only save the latches if we need bootstrap or are bootstrapping + if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState())) + inflightTasks.offer(completionLatch); + MessagingService.instance().sendRR(message, endpoint, cb); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae315b5e/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 1c20a22..1baa478 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.service; -import static java.nio.charset.StandardCharsets.ISO_8859_1; - import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.File; @@ -848,12 +846,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); } - // if our schema hasn't matched yet, keep sleeping until it does + // if our schema hasn't matched yet, wait until it has + // we do this by waiting for all in-flight migration requests and responses to complete // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful) - while (!MigrationManager.isReadyForBootstrap()) + if (!MigrationManager.isReadyForBootstrap()) { setMode(Mode.JOINING, "waiting for schema information to complete", true); - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + MigrationManager.waitUntilReadyForBootstrap(); } setMode(Mode.JOINING, "schema complete, ready to bootstrap", true); setMode(Mode.JOINING, "waiting for pending range calculation", true);