Updated Branches: refs/heads/cassandra-1.1 e376bc02e -> 44d6f49bb
fix for intermittent SchemaDisagreementException patch by Pavel Yaskevich; reviewed by Sylvain Lebresne for CASSANDRA-3884 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9dbb1b77 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9dbb1b77 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9dbb1b77 Branch: refs/heads/cassandra-1.1 Commit: 9dbb1b7757b1601e78254fde5a7142e93f46b620 Parents: e376bc0 Author: Pavel Yaskevich <xe...@apache.org> Authored: Thu Feb 23 18:03:17 2012 +0300 Committer: Pavel Yaskevich <xe...@apache.org> Committed: Thu Feb 23 18:28:15 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/config/Schema.java | 2 +- .../org/apache/cassandra/cql/QueryProcessor.java | 1 - .../cql3/statements/SchemaAlteringStatement.java | 1 - .../cassandra/db/migration/AddColumnFamily.java | 6 +- .../apache/cassandra/db/migration/AddKeyspace.java | 6 +- .../cassandra/db/migration/DropColumnFamily.java | 6 +- .../cassandra/db/migration/DropKeyspace.java | 6 +- .../apache/cassandra/db/migration/Migration.java | 20 +++- .../cassandra/db/migration/MigrationHelper.java | 86 ++++++++++----- .../cassandra/db/migration/UpdateColumnFamily.java | 6 +- .../cassandra/db/migration/UpdateKeyspace.java | 8 +- .../apache/cassandra/thrift/CassandraServer.java | 1 - 13 files changed, 100 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a41bdba..2796c5c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,7 @@ * Fix bug with counters in super columns (CASSANDRA-3821) * Remove deprecated merge_shard_chance (CASSANDRA-3940) * add a convenient way to reset a node's schema (CASSANDRA-2963) + * fix for intermittent SchemaDisagreementException (CASSANDRA-3884) Merged from 1.0: * remove the wait on hint future during write (CASSANDRA-3870) * (cqlsh) ignore missing CfDef opts (CASSANDRA-3933) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index d27b347..2ae507b 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -462,7 +462,7 @@ public class Schema for (Row row : SystemTable.serializedSchema()) { - if (row.cf == null || row.cf.getColumnCount() == 0) + if (row.cf == null || row.cf.isMarkedForDelete() || row.cf.isEmpty()) continue; row.cf.updateDigest(versionDigest); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/src/java/org/apache/cassandra/cql/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java index c539979..befcb21 100644 --- a/src/java/org/apache/cassandra/cql/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java @@ -332,7 +332,6 @@ public class QueryProcessor public Object call() throws Exception { m.apply(); - m.announce(); return null; } }); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java index 573d0eb..a34942d 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java @@ -129,7 +129,6 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL public Object call() throws Exception { m.apply(); - m.announce(); return null; } }); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java b/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java index b3762ca..944ee9d 100644 --- a/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java +++ b/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java @@ -18,11 +18,13 @@ package org.apache.cassandra.db.migration; import java.io.IOException; +import java.util.Collection; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.RowMutation; public class AddColumnFamily extends Migration { @@ -44,9 +46,9 @@ public class AddColumnFamily extends Migration this.cfm = cfm; } - protected void applyImpl() throws ConfigurationException, IOException + protected Collection<RowMutation> applyImpl() throws ConfigurationException, IOException { - MigrationHelper.addColumnFamily(cfm, timestamp); + return MigrationHelper.addColumnFamily(cfm, timestamp); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/src/java/org/apache/cassandra/db/migration/AddKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/migration/AddKeyspace.java b/src/java/org/apache/cassandra/db/migration/AddKeyspace.java index 1cd1e99..d0f4bfd 100644 --- a/src/java/org/apache/cassandra/db/migration/AddKeyspace.java +++ b/src/java/org/apache/cassandra/db/migration/AddKeyspace.java @@ -19,11 +19,13 @@ package org.apache.cassandra.db.migration; import java.io.IOException; +import java.util.Collection; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.RowMutation; public class AddKeyspace extends Migration { @@ -44,9 +46,9 @@ public class AddKeyspace extends Migration this.ksm = ksm; } - protected void applyImpl() throws ConfigurationException, IOException + protected Collection<RowMutation> applyImpl() throws ConfigurationException, IOException { - MigrationHelper.addKeyspace(ksm, timestamp); + return MigrationHelper.addKeyspace(ksm, timestamp); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java b/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java index 2f2bed2..b83cf9d 100644 --- a/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java +++ b/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java @@ -18,10 +18,12 @@ package org.apache.cassandra.db.migration; import java.io.IOException; +import java.util.Collection; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.RowMutation; public class DropColumnFamily extends Migration { @@ -42,9 +44,9 @@ public class DropColumnFamily extends Migration this.cfName = cfName; } - protected void applyImpl() throws ConfigurationException, IOException + protected Collection<RowMutation> applyImpl() throws ConfigurationException, IOException { - MigrationHelper.dropColumnFamily(ksName, cfName, timestamp); + return MigrationHelper.dropColumnFamily(ksName, cfName, timestamp); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/src/java/org/apache/cassandra/db/migration/DropKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/migration/DropKeyspace.java b/src/java/org/apache/cassandra/db/migration/DropKeyspace.java index c6e200e..0fd8cb9 100644 --- a/src/java/org/apache/cassandra/db/migration/DropKeyspace.java +++ b/src/java/org/apache/cassandra/db/migration/DropKeyspace.java @@ -19,10 +19,12 @@ package org.apache.cassandra.db.migration; import java.io.IOException; +import java.util.Collection; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.RowMutation; public class DropKeyspace extends Migration { @@ -39,9 +41,9 @@ public class DropKeyspace extends Migration this.name = name; } - protected void applyImpl() throws ConfigurationException, IOException + protected Collection<RowMutation> applyImpl() throws ConfigurationException, IOException { - MigrationHelper.dropKeyspace(name, timestamp); + return MigrationHelper.dropKeyspace(name, timestamp); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/src/java/org/apache/cassandra/db/migration/Migration.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/migration/Migration.java b/src/java/org/apache/cassandra/db/migration/Migration.java index 762fec3..faf6784 100644 --- a/src/java/org/apache/cassandra/db/migration/Migration.java +++ b/src/java/org/apache/cassandra/db/migration/Migration.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.migration; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.UUID; import org.slf4j.Logger; @@ -63,27 +64,36 @@ public abstract class Migration public final void apply() throws ConfigurationException, IOException { - applyImpl(); + Collection<RowMutation> mutations = applyImpl(); + + assert !mutations.isEmpty(); if (!StorageService.instance.isClientMode()) MigrationHelper.flushSchemaCFs(); Schema.instance.updateVersion(); + announce(mutations); } /** * Class specific apply implementation where schema migration logic should be put * + * @return mutations to update native schema + * * @throws IOException on any I/O related error. * @throws ConfigurationException if there is object misconfiguration. */ - protected abstract void applyImpl() throws ConfigurationException, IOException; + protected abstract Collection<RowMutation> applyImpl() throws ConfigurationException, IOException; - /** Send schema update (in form of row mutations) to alive nodes in the cluster. apply() must be called first. */ - public final void announce() + /** + * Send schema update (in form of row mutations) to alive nodes in the cluster. + * + * @param mutations to distribute in the cluster + */ + private void announce(Collection<RowMutation> mutations) { assert !StorageService.instance.isClientMode(); - MigrationManager.announce(SystemTable.serializeSchema()); + MigrationManager.announce(mutations); passiveAnnounce(); // keeps gossip in sync w/ what we just told everyone } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/src/java/org/apache/cassandra/db/migration/MigrationHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/migration/MigrationHelper.java b/src/java/org/apache/cassandra/db/migration/MigrationHelper.java index d6a498e..c8d94da 100644 --- a/src/java/org/apache/cassandra/db/migration/MigrationHelper.java +++ b/src/java/org/apache/cassandra/db/migration/MigrationHelper.java @@ -19,11 +19,7 @@ package org.apache.cassandra.db.migration; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.Future; import com.google.common.collect.Iterables; @@ -194,9 +190,9 @@ public class MigrationHelper /* Schema Mutation Helpers */ - public static void addKeyspace(KSMetaData ksm, long timestamp) throws ConfigurationException, IOException + public static Collection<RowMutation> addKeyspace(KSMetaData ksm, long timestamp) throws ConfigurationException, IOException { - addKeyspace(ksm, timestamp, true); + return addKeyspace(ksm, timestamp, true); } public static void addKeyspace(KSMetaData ksDef) throws ConfigurationException, IOException @@ -204,9 +200,9 @@ public class MigrationHelper addKeyspace(ksDef, -1, false); } - public static void addColumnFamily(CFMetaData cfm, long timestamp) throws ConfigurationException, IOException + public static Collection<RowMutation> addColumnFamily(CFMetaData cfm, long timestamp) throws ConfigurationException, IOException { - addColumnFamily(cfm, timestamp, true); + return addColumnFamily(cfm, timestamp, true); } public static void addColumnFamily(CfDef cfDef) throws ConfigurationException, IOException @@ -226,9 +222,9 @@ public class MigrationHelper updateKeyspace(newState, -1, false); } - public static void updateKeyspace(KsDef newState, long timestamp) throws ConfigurationException, IOException + public static Collection<RowMutation> updateKeyspace(KsDef newState, long timestamp) throws ConfigurationException, IOException { - updateKeyspace(newState, timestamp, true); + return updateKeyspace(newState, timestamp, true); } public static void updateColumnFamily(CfDef newState) throws ConfigurationException, IOException @@ -236,9 +232,9 @@ public class MigrationHelper updateColumnFamily(newState, -1, false); } - public static void updateColumnFamily(CfDef newState, long timestamp) throws ConfigurationException, IOException + public static Collection<RowMutation> updateColumnFamily(CfDef newState, long timestamp) throws ConfigurationException, IOException { - updateColumnFamily(newState, timestamp, true); + return updateColumnFamily(newState, timestamp, true); } public static void dropColumnFamily(String ksName, String cfName) throws IOException @@ -246,9 +242,9 @@ public class MigrationHelper dropColumnFamily(ksName, cfName, -1, false); } - public static void dropColumnFamily(String ksName, String cfName, long timestamp) throws IOException + public static Collection<RowMutation> dropColumnFamily(String ksName, String cfName, long timestamp) throws IOException { - dropColumnFamily(ksName, cfName, timestamp, true); + return dropColumnFamily(ksName, cfName, timestamp, true); } public static void dropKeyspace(String ksName) throws IOException @@ -256,14 +252,14 @@ public class MigrationHelper dropKeyspace(ksName, -1, false); } - public static void dropKeyspace(String ksName, long timestamp) throws IOException + public static Collection<RowMutation> dropKeyspace(String ksName, long timestamp) throws IOException { - dropKeyspace(ksName, timestamp, true); + return dropKeyspace(ksName, timestamp, true); } /* Migration Helper implementations */ - private static void addKeyspace(KSMetaData ksm, long timestamp, boolean withSchemaRecord) throws ConfigurationException, IOException + private static Collection<RowMutation> addKeyspace(KSMetaData ksm, long timestamp, boolean withSchemaRecord) throws ConfigurationException, IOException { RowMutation keyspaceDef = ksm.toSchema(timestamp); @@ -274,17 +270,24 @@ public class MigrationHelper if (!StorageService.instance.isClientMode()) Table.open(ksm.name); + + return toCollection(keyspaceDef); } - private static void addColumnFamily(CFMetaData cfm, long timestamp, boolean withSchemaRecord) throws ConfigurationException, IOException + private static Collection<RowMutation> addColumnFamily(CFMetaData cfm, long timestamp, boolean withSchemaRecord) throws ConfigurationException, IOException { KSMetaData ksm = Schema.instance.getTableDefinition(cfm.ksName); ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm))); Schema.instance.load(cfm); + RowMutation mutation = null; + if (withSchemaRecord) - cfm.toSchema(timestamp).apply(); + { + mutation = cfm.toSchema(timestamp); + mutation.apply(); + } // make sure it's init-ed w/ the old definitions first, // since we're going to call initCf on the new one manually @@ -294,15 +297,19 @@ public class MigrationHelper if (!StorageService.instance.isClientMode()) Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName); + + return toCollection(mutation); } - private static void updateKeyspace(KsDef newState, long timestamp, boolean withSchemaRecord) throws ConfigurationException, IOException + private static Collection<RowMutation> updateKeyspace(KsDef newState, long timestamp, boolean withSchemaRecord) throws ConfigurationException, IOException { KSMetaData oldKsm = Schema.instance.getKSMetaData(newState.name); + RowMutation schemaUpdate = null; + if (withSchemaRecord) { - RowMutation schemaUpdate = oldKsm.diff(newState, timestamp); + schemaUpdate = oldKsm.diff(newState, timestamp); schemaUpdate.apply(); } @@ -312,15 +319,19 @@ public class MigrationHelper if (!StorageService.instance.isClientMode()) Table.open(newState.name).createReplicationStrategy(newKsm); + + return toCollection(schemaUpdate); } - private static void updateColumnFamily(CfDef newState, long timestamp, boolean withSchemaRecord) throws ConfigurationException, IOException + private static Collection<RowMutation> updateColumnFamily(CfDef newState, long timestamp, boolean withSchemaRecord) throws ConfigurationException, IOException { CFMetaData cfm = Schema.instance.getCFMetaData(newState.keyspace, newState.name); + RowMutation schemaUpdate = null; + if (withSchemaRecord) { - RowMutation schemaUpdate = cfm.diff(newState, timestamp); + schemaUpdate = cfm.diff(newState, timestamp); schemaUpdate.apply(); } @@ -331,9 +342,11 @@ public class MigrationHelper Table table = Table.open(cfm.ksName); table.getColumnFamilyStore(cfm.cfName).reload(); } + + return toCollection(schemaUpdate); } - private static void dropKeyspace(String ksName, long timestamp, boolean withSchemaRecord) throws IOException + private static Collection<RowMutation> dropKeyspace(String ksName, long timestamp, boolean withSchemaRecord) throws IOException { KSMetaData ksm = Schema.instance.getTableDefinition(ksName); String snapshotName = Table.getTimestampedSnapshotName(ksName); @@ -352,18 +365,23 @@ public class MigrationHelper } } + Collection<RowMutation> mutations = Collections.emptyList(); + if (withSchemaRecord) { - for (RowMutation m : ksm.dropFromSchema(timestamp)) + mutations = ksm.dropFromSchema(timestamp); + for (RowMutation m : mutations) m.apply(); } // remove the table from the static instances. Table.clear(ksm.name); Schema.instance.clearTableDefinition(ksm); + + return mutations; } - private static void dropColumnFamily(String ksName, String cfName, long timestamp, boolean withSchemaRecord) throws IOException + private static Collection<RowMutation> dropColumnFamily(String ksName, String cfName, long timestamp, boolean withSchemaRecord) throws IOException { KSMetaData ksm = Schema.instance.getTableDefinition(ksName); ColumnFamilyStore cfs = Table.open(ksName).getColumnFamilyStore(cfName); @@ -374,14 +392,21 @@ public class MigrationHelper Schema.instance.purge(cfm); Schema.instance.setTableDefinition(makeNewKeyspaceDefinition(ksm, cfm)); + RowMutation mutation = null; + if (withSchemaRecord) - cfm.dropFromSchema(timestamp).apply(); + { + mutation = cfm.dropFromSchema(timestamp); + mutation.apply(); + } if (!StorageService.instance.isClientMode()) { cfs.snapshot(Table.getTimestampedSnapshotName(cfs.columnFamily)); Table.open(ksm.name).dropCf(cfm.cfId); } + + return toCollection(mutation); } private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude) @@ -392,4 +417,9 @@ public class MigrationHelper assert newCfs.size() == ksm.cfMetaData().size() - 1; return KSMetaData.cloneWith(ksm, newCfs); } + + private static Collection<RowMutation> toCollection(RowMutation mutation) + { + return mutation == null ? Collections.<RowMutation>emptyList() : Collections.singleton(mutation); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java b/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java index 77d014f..7852e33 100644 --- a/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java +++ b/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java @@ -18,9 +18,11 @@ package org.apache.cassandra.db.migration; import java.io.IOException; +import java.util.Collection; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.thrift.CfDef; public class UpdateColumnFamily extends Migration @@ -37,9 +39,9 @@ public class UpdateColumnFamily extends Migration this.newState = newState; } - protected void applyImpl() throws ConfigurationException, IOException + protected Collection<RowMutation> applyImpl() throws ConfigurationException, IOException { - MigrationHelper.updateColumnFamily(newState, timestamp); + return MigrationHelper.updateColumnFamily(newState, timestamp); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java b/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java index 86968be..c529bb9 100644 --- a/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java +++ b/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java @@ -18,9 +18,11 @@ package org.apache.cassandra.db.migration; import java.io.IOException; +import java.util.Collection; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.thrift.KsDef; public class UpdateKeyspace extends Migration @@ -40,11 +42,11 @@ public class UpdateKeyspace extends Migration this.newState = newState; } - protected void applyImpl() throws ConfigurationException, IOException + protected Collection<RowMutation> applyImpl() throws ConfigurationException, IOException { - MigrationHelper.updateKeyspace(newState, timestamp); - + Collection<RowMutation> mutations = MigrationHelper.updateKeyspace(newState, timestamp); logger.info("Keyspace updated. Please perform any manual operations."); + return mutations; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dbb1b77/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 4e141b4..2503315 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -920,7 +920,6 @@ public class CassandraServer implements Cassandra.Iface public void runMayThrow() throws Exception { m.apply(); - m.announce(); } });