Pull schema immediately when bootstrapping. Patch by Chris Herron and brandonwilliams, reviewed by jbellis for CASSANDRA-5025
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/442a7b3a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/442a7b3a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/442a7b3a Branch: refs/heads/cassandra-1.2.0 Commit: 442a7b3a60c5cc765221cc6f07add3546a8e9c3d Parents: 8d55474 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Fri Dec 7 15:31:25 2012 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Fri Dec 7 15:31:25 2012 -0600 ---------------------------------------------------------------------- .../apache/cassandra/service/MigrationManager.java | 57 +++++++++------ .../apache/cassandra/service/StorageService.java | 7 ++- 2 files changed, 41 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/442a7b3a/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 102ea12..72a9a84 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.ArrayList; @@ -92,6 +93,10 @@ public class MigrationManager implements IEndpointStateChangeSubscriber public void onRemove(InetAddress endpoint) {} + /** + * If versions differ this node sends request with local migration list to the endpoint + * and expecting to receive a list of migrations to apply locally. + */ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint) { // Can't request migrations from nodes with versions younger than 1.1.7 @@ -101,29 +106,39 @@ public class MigrationManager implements IEndpointStateChangeSubscriber if (Schema.instance.getVersion().equals(theirVersion)) return; - // check our schema vs theirs, after a delay to make sure we have a chance to apply any changes - // being pushed out simultaneously. See CASSANDRA-5025 - Runnable runnable = new Runnable() + if (Schema.emptyVersion.equals(Schema.instance.getVersion())) { - public void run() + // If we think we may be bootstrapping, submit MigrationTask immediately + submitMigrationTask(endpoint); + } + else + { + // Include a delay to make sure we have a chance to apply any changes being + // pushed out simultaneously. See CASSANDRA-5025 + Runnable runnable = new Runnable() { - // grab the latest version of the schema since it may have changed again since the initial scheduling - VersionedValue value = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA); - UUID currentVersion = UUID.fromString(value.value); - if (Schema.instance.getVersion().equals(currentVersion)) - return; - - /** - * if versions differ this node sends request with local migration list to the endpoint - * and expecting to receive a list of migrations to apply locally. - * - * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are - * running in the gossip stage. - */ - StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); - } - }; - StorageService.optionalTasks.schedule(runnable, 1, TimeUnit.MINUTES); + public void run() + { + // grab the latest version of the schema since it may have changed again since the initial scheduling + VersionedValue value = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA); + UUID currentVersion = UUID.fromString(value.value); + if (Schema.instance.getVersion().equals(currentVersion)) + return; + + submitMigrationTask(endpoint); + } + }; + StorageService.optionalTasks.schedule(runnable, 1, TimeUnit.MINUTES); + } + } + + private static void submitMigrationTask(InetAddress endpoint) + { + /* + * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are + * running in the gossip stage. + */ + StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); } public static boolean isReadyForBootstrap() http://git-wip-us.apache.org/repos/asf/cassandra/blob/442a7b3a/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 30da45c..d041279 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -544,8 +544,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe Gossiper.instance.register(this); Gossiper.instance.register(migrationManager); Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering. - // gossip schema version when gossiper is running - Schema.instance.updateVersionAndAnnounce(); + + // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull) + Schema.instance.updateVersion(); // Ensure we know our own actual Schema UUID in preparation for updates + MigrationManager.passiveAnnounce(Schema.emptyVersion); + // add rpc listening info Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress())); if (null != DatabaseDescriptor.getReplaceToken())