Avoids some exception when doing migration in a mixed 1.0-1.1 cluster patch by xedin; reviewed by slebresne for CASSANDRA-3804
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5888fcd6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5888fcd6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5888fcd6 Branch: refs/heads/trunk Commit: 5888fcd6aff4e8c00ad7f4c629c5feb1cf9663a0 Parents: 44d6f49 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Thu Feb 23 17:22:11 2012 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Feb 23 18:02:28 2012 +0100 ---------------------------------------------------------------------- src/java/org/apache/cassandra/db/DefsTable.java | 7 +++++++ .../apache/cassandra/service/MigrationManager.java | 14 +++++++++++--- .../apache/cassandra/service/StorageService.java | 2 +- 3 files changed, 19 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5888fcd6/src/java/org/apache/cassandra/db/DefsTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java index 142e032..f656a33 100644 --- a/src/java/org/apache/cassandra/db/DefsTable.java +++ b/src/java/org/apache/cassandra/db/DefsTable.java @@ -42,6 +42,7 @@ import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.migration.Migration; import org.apache.cassandra.db.migration.MigrationHelper; import org.apache.cassandra.db.migration.avro.KsDef; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.CfDef; @@ -223,6 +224,12 @@ public class DefsTable */ public static void mergeRemoteSchema(byte[] data, int version) throws ConfigurationException, IOException { + if (version < MessagingService.VERSION_11) + { + logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1, please update first."); + return; + } + // save current state of the schema Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemTable.getSchema(SystemTable.SCHEMA_KEYSPACES_CF); Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemTable.getSchema(SystemTable.SCHEMA_COLUMNFAMILIES_CF); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5888fcd6/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index b37fc2b..6990d84 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -26,6 +26,9 @@ import java.net.InetAddress; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -84,6 +87,10 @@ public class MigrationManager implements IEndpointStateChangeSubscriber public static void rectifySchema(UUID theirVersion, final InetAddress endpoint) { + // Can't request migrations from nodes with versions younger than 1.1 + if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_11) + return; + if (Schema.instance.getVersion().equals(theirVersion)) return; @@ -126,6 +133,10 @@ public class MigrationManager implements IEndpointStateChangeSubscriber if (endpoint.equals(FBUtilities.getBroadcastAddress())) continue; // don't push schema mutation to self + // don't send migrations to the nodes with the versions older than < 1.1 + if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_11) + continue; + pushSchemaMutation(endpoint, schema); } } @@ -195,9 +206,6 @@ public class MigrationManager implements IEndpointStateChangeSubscriber */ public static Collection<RowMutation> deserializeMigrationMessage(byte[] data, int version) throws IOException { - if (version < MessagingService.VERSION_11) - throw new IOException("Can't accept schema migrations from Cassandra versions previous to 1.1, please update first."); - Collection<RowMutation> schema = new ArrayList<RowMutation>(); DataInputStream in = new DataInputStream(new FastByteArrayInputStream(data)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5888fcd6/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a0e99ee..91ea705 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -102,7 +102,6 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe GOSSIP_DIGEST_ACK2, DEFINITIONS_ANNOUNCE, // Deprecated DEFINITIONS_UPDATE, - MIGRATION_REQUEST, TRUNCATE, SCHEMA_CHECK, INDEX_SCAN, // Deprecated @@ -112,6 +111,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe STREAMING_REPAIR_REQUEST, STREAMING_REPAIR_RESPONSE, SNAPSHOT, // Similar to nt snapshot + MIGRATION_REQUEST, // use as padding for backwards compatability where a previous version needs to validate a verb from the future. UNUSED_1, UNUSED_2,