merge from 1.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c14e266e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c14e266e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c14e266e Branch: refs/heads/cassandra-1.1 Commit: c14e266eb383a4e10bb945b864ad7d24ca5eb98b Parents: 0d1d3bc 142e8c1 Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed Apr 11 09:05:33 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed Apr 11 09:05:33 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 3 + build.xml | 49 ++++++++++- conf/cassandra.yaml | 6 ++ src/java/org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 4 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 3 +- src/java/org/apache/cassandra/db/DefsTable.java | 6 +- tools/stress/bin/stress | 16 +--- tools/stress/bin/stress.bat | 4 +- tools/stress/bin/stressd | 16 +--- tools/stress/build.xml | 70 --------------- 11 files changed, 69 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c14e266e/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 43ee218,e2fc4de..26315be --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,54 -1,8 +1,57 @@@ -1.0.10 +1.1-dev + * fix read_repair_chance to really default to 0.1 in the cli (CASSANDRA-4114) + * Adds caching and bloomFilterFpChange to CQL options (CASSANDRA-4042) + * Adds posibility to autoconfigure size of the KeyCache (CASSANDRA-4087) + * fix KEYS index from skipping results (CASSANDRA-3996) + * Remove sliced_buffer_size_in_kb dead option (CASSANDRA-4076) + * make loadNewSStable preserve sstable version (CASSANDRA-4077) + * Respect 1.0 cache settings as much as possible when upgrading + (CASSANDRA-4088) + * relax path length requirement for sstable files when upgrading on + non-Windows platforms (CASSANDRA-4110) + * fix terminination of the stress.java when errors were encountered + (CASSANDRA-4128) + * Move CfDef and KsDef validation out of thrift (CASSANDRA-4037) +Merged from 1.0: + * add auto_snapshot option allowing disabling snapshot before drop/truncate + (CASSANDRA-3710) * allow short snitch names (CASSANDRA-4130) + ++ +1.1-beta2 + * rename loaded sstables to avoid conflicts with local snapshots + (CASSANDRA-3967) + * start hint replay as soon as FD notifies that the target is back up + (CASSANDRA-3958) + * avoid unproductive deserializing of cached rows during compaction + (CASSANDRA-3921) + * fix concurrency issues with CQL keyspace creation (CASSANDRA-3903) + * Show Effective Owership via Nodetool ring <keyspace> (CASSANDRA-3412) + * Update ORDER BY syntax for CQL3 (CASSANDRA-3925) + * Fix BulkRecordWriter to not throw NPE if reducer gets no map data from Hadoop (CASSANDRA-3944) + * Fix bug with counters in super columns (CASSANDRA-3821) + * Remove deprecated merge_shard_chance (CASSANDRA-3940) + * add a convenient way to reset a node's schema (CASSANDRA-2963) + * fix for intermittent SchemaDisagreementException (CASSANDRA-3884) + * ignore deprecated KsDef/CfDef/ColumnDef fields in native schema (CASSANDRA-3963) + * CLI to report when unsupported column_metadata pair was given (CASSANDRA-3959) + * reincarnate removed and deprecated KsDef/CfDef attributes (CASSANDRA-3953) + * Fix race between writes and read for cache (CASSANDRA-3862) + * perform static initialization of StorageProxy on start-up (CASSANDRA-3797) + * support trickling fsync() on writes (CASSANDRA-3950) + * expose counters for unavailable/timeout exceptions given to thrift clients (CASSANDRA-3671) + * avoid quadratic startup time in LeveledManifest (CASSANDRA-3952) + * Add type information to new schema_ columnfamilies and remove thrift + serialization for schema (CASSANDRA-3792) + * add missing column validator options to the CLI help (CASSANDRA-3926) + * skip reading saved key cache if CF's caching strategy is NONE or ROWS_ONLY (CASSANDRA-3954) + * Unify migration code (CASSANDRA-4017) +Merged from 1.0: * cqlsh: guess correct version of Python for Arch Linux (CASSANDRA-4090) + + +1.0.9 +======= * (CLI) properly handle quotes in create/update keyspace commands (CASSANDRA-4129) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c14e266e/build.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c14e266e/conf/cassandra.yaml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c14e266e/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c14e266e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c14e266e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c14e266e/src/java/org/apache/cassandra/db/DefsTable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/DefsTable.java index ddab690,4ff4077..f9e266b --- a/src/java/org/apache/cassandra/db/DefsTable.java +++ b/src/java/org/apache/cassandra/db/DefsTable.java @@@ -125,438 -41,81 +125,440 @@@ public class DefsTabl // NB: must be an invalid keyspace name public static final ByteBuffer DEFINITION_SCHEMA_COLUMN_NAME = ByteBufferUtil.bytes("Avro/Schema"); - /** dumps current keyspace definitions to storage */ - public static synchronized void dumpToStorage(UUID version) throws IOException + public static final String OLD_MIGRATIONS_CF = "Migrations"; + public static final String OLD_SCHEMA_CF = "Schema"; + + /* dumps current keyspace definitions to storage */ + public static synchronized void dumpToStorage(Collection<KSMetaData> keyspaces) throws IOException { - final ByteBuffer versionKey = Migration.toUTF8Bytes(version); + long timestamp = System.currentTimeMillis(); - // build a list of keyspaces - Collection<String> ksnames = org.apache.cassandra.config.Schema.instance.getNonSystemTables(); + for (KSMetaData ksMetaData : keyspaces) + ksMetaData.toSchema(timestamp).apply(); + } + + /** + * Load keyspace definitions for the system keyspace (system.SCHEMA_KEYSPACES_CF) + * + * @return Collection of found keyspace definitions + * + * @throws IOException if failed to read SCHEMA_KEYSPACES_CF + */ + public static Collection<KSMetaData> loadFromTable() throws IOException + { + List<Row> serializedSchema = SystemTable.serializedSchema(SystemTable.SCHEMA_KEYSPACES_CF); - // persist keyspaces under new version - RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, versionKey); - long now = System.currentTimeMillis(); - for (String ksname : ksnames) + List<KSMetaData> keyspaces = new ArrayList<KSMetaData>(); + + for (Row row : serializedSchema) { - KSMetaData ksm = org.apache.cassandra.config.Schema.instance.getTableDefinition(ksname); - rm.add(new QueryPath(Migration.SCHEMA_CF, null, ByteBufferUtil.bytes(ksm.name)), SerDeUtils.serialize(ksm.toAvro()), now); + if (row.cf == null || row.cf.isEmpty() || row.cf.isMarkedForDelete()) + continue; + + keyspaces.add(KSMetaData.fromSchema(row, serializedColumnFamilies(row.key))); } - // add the schema - rm.add(new QueryPath(Migration.SCHEMA_CF, - null, - DEFINITION_SCHEMA_COLUMN_NAME), - ByteBufferUtil.bytes(org.apache.cassandra.db.migration.avro.KsDef.SCHEMA$.toString()), - now); - rm.apply(); - - // apply new version - rm = new RowMutation(Table.SYSTEM_TABLE, Migration.LAST_MIGRATION_KEY); - rm.add(new QueryPath(Migration.SCHEMA_CF, null, Migration.LAST_MIGRATION_KEY), - ByteBuffer.wrap(UUIDGen.decompose(version)), - now); - rm.apply(); - } - - /** loads a version of keyspace definitions from storage */ + + return keyspaces; + } + + public static ByteBuffer searchComposite(String name, boolean start) + { + assert name != null; + ByteBuffer nameBytes = UTF8Type.instance.decompose(name); + int length = nameBytes.remaining(); + byte[] bytes = new byte[2 + length + 1]; + bytes[0] = (byte)((length >> 8) & 0xFF); + bytes[1] = (byte)(length & 0xFF); + ByteBufferUtil.arrayCopy(nameBytes, 0, bytes, 2, length); + bytes[bytes.length - 1] = (byte)(start ? 0 : 1); + return ByteBuffer.wrap(bytes); + } + + private static Row serializedColumnFamilies(DecoratedKey ksNameKey) + { + ColumnFamilyStore cfsStore = SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF); + return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey, new QueryPath(SystemTable.SCHEMA_COLUMNFAMILIES_CF)))); + } + + /** + * Loads a version of keyspace definitions from storage (using old SCHEMA_CF as a data source) + * Note: If definitions where found in SCHEMA_CF this method would load them into new schema handling table KEYSPACE_CF + * + * @param version The version of the latest migration. + * + * @return Collection of found keyspace definitions + * + * @throws IOException if failed to read SCHEMA_CF or failed to deserialize Avro schema + */ public static synchronized Collection<KSMetaData> loadFromStorage(UUID version) throws IOException { - DecoratedKey vkey = StorageService.getPartitioner().decorateKey(Migration.toUTF8Bytes(version)); + DecoratedKey vkey = StorageService.getPartitioner().decorateKey(toUTF8Bytes(version)); Table defs = Table.open(Table.SYSTEM_TABLE); - ColumnFamilyStore cfStore = defs.getColumnFamilyStore(Migration.SCHEMA_CF); - QueryFilter filter = QueryFilter.getIdentityFilter(vkey, new QueryPath(Migration.SCHEMA_CF)); - ColumnFamily cf = cfStore.getColumnFamily(filter); + ColumnFamilyStore cfStore = defs.getColumnFamilyStore(OLD_SCHEMA_CF); + ColumnFamily cf = cfStore.getColumnFamily(QueryFilter.getIdentityFilter(vkey, new QueryPath(OLD_SCHEMA_CF))); IColumn avroschema = cf.getColumn(DEFINITION_SCHEMA_COLUMN_NAME); - if (avroschema == null) - // TODO: more polite way to handle this? - throw new RuntimeException("Cannot read system table! Are you upgrading a pre-release version?"); - ByteBuffer value = avroschema.value(); - Schema schema = Schema.parse(ByteBufferUtil.string(value)); + Collection<KSMetaData> keyspaces = Collections.emptyList(); - // deserialize keyspaces using schema - Collection<KSMetaData> keyspaces = new ArrayList<KSMetaData>(); - for (IColumn column : cf.getSortedColumns()) + if (avroschema != null) { - if (column.name().equals(DEFINITION_SCHEMA_COLUMN_NAME)) - continue; - org.apache.cassandra.db.migration.avro.KsDef ks = SerDeUtils.deserialize(schema, column.value(), new org.apache.cassandra.db.migration.avro.KsDef()); - keyspaces.add(KSMetaData.fromAvro(ks)); + ByteBuffer value = avroschema.value(); + org.apache.avro.Schema schema = org.apache.avro.Schema.parse(ByteBufferUtil.string(value)); + + // deserialize keyspaces using schema + keyspaces = new ArrayList<KSMetaData>(); + + for (IColumn column : cf.getSortedColumns()) + { + if (column.name().equals(DEFINITION_SCHEMA_COLUMN_NAME)) + continue; + KsDef ks = deserializeAvro(schema, column.value(), new KsDef()); + keyspaces.add(Avro.ksFromAvro(ks)); + } + + // store deserialized keyspaces into new place + dumpToStorage(keyspaces); + + logger.info("Truncating deprecated system column families (migrations, schema)..."); + dropColumnFamily(Table.SYSTEM_TABLE, OLD_MIGRATIONS_CF); + dropColumnFamily(Table.SYSTEM_TABLE, OLD_SCHEMA_CF); } + return keyspaces; } - - /** gets all the files that belong to a given column family. */ - public static Set<File> getFiles(String table, final String cf) + + /** + * Merge remote schema in form of row mutations with local and mutate ks/cf metadata objects + * (which also involves fs operations on add/drop ks/cf) + * + * @param data The data of the message from remote node with schema information + * @param version The version of the message + * + * @throws ConfigurationException If one of metadata attributes has invalid value + * @throws IOException If data was corrupted during transportation or failed to apply fs operations + */ + public static void mergeRemoteSchema(byte[] data, int version) throws ConfigurationException, IOException + { + if (version < MessagingService.VERSION_11) + { + logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1, please update first."); + return; + } + + mergeSchema(MigrationManager.deserializeMigrationMessage(data, version)); + } + + public static synchronized void mergeSchema(Collection<RowMutation> mutations) throws ConfigurationException, IOException + { + // current state of the schema + Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemTable.getSchema(SystemTable.SCHEMA_KEYSPACES_CF); + Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemTable.getSchema(SystemTable.SCHEMA_COLUMNFAMILIES_CF); + + for (RowMutation mutation : mutations) + mutation.apply(); + + if (!StorageService.instance.isClientMode()) + flushSchemaCFs(); + + Schema.instance.updateVersionAndAnnounce(); + + // with new data applied + Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemTable.getSchema(SystemTable.SCHEMA_KEYSPACES_CF); + Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemTable.getSchema(SystemTable.SCHEMA_COLUMNFAMILIES_CF); + + Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces); + mergeColumnFamilies(oldColumnFamilies, newColumnFamilies); + + // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted + for (String keyspaceToDrop : keyspacesToDrop) + dropKeyspace(keyspaceToDrop); + + } + + private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated) + throws ConfigurationException, IOException + { + // calculate the difference between old and new states (note that entriesOnlyLeft() will be always empty) + MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated); + + /** + * At first step we check if any new keyspaces were added. + */ + for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) + { + ColumnFamily ksAttrs = entry.getValue(); + + // we don't care about nested ColumnFamilies here because those are going to be processed separately + if (!ksAttrs.isEmpty()) + addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), entry.getValue()), Collections.<CFMetaData>emptyList())); + } + + /** + * At second step we check if there were any keyspaces re-created, in this context + * re-created means that they were previously deleted but still exist in the low-level schema as empty keys + */ + + Map<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntries = diff.entriesDiffering(); + + // instead of looping over all modified entries and skipping processed keys all the time + // we would rather store "left to process" items and iterate over them removing already met keys + List<DecoratedKey> leftToProcess = new ArrayList<DecoratedKey>(); + + for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : modifiedEntries.entrySet()) + { + ColumnFamily prevValue = entry.getValue().leftValue(); + ColumnFamily newValue = entry.getValue().rightValue(); + + if (prevValue.isEmpty()) + { + addKeyspace(KSMetaData.fromSchema(new Row(entry.getKey(), newValue), Collections.<CFMetaData>emptyList())); + continue; + } + + leftToProcess.add(entry.getKey()); + } + + if (leftToProcess.size() == 0) + return Collections.emptySet(); + + /** + * At final step we updating modified keyspaces and saving keyspaces drop them later + */ + + Set<String> keyspacesToDrop = new HashSet<String>(); + + for (DecoratedKey key : leftToProcess) + { + MapDifference.ValueDifference<ColumnFamily> valueDiff = modifiedEntries.get(key); + + ColumnFamily newState = valueDiff.rightValue(); + + if (newState.isEmpty()) + keyspacesToDrop.add(AsciiType.instance.getString(key.key)); + else + updateKeyspace(KSMetaData.fromSchema(new Row(key, newState), Collections.<CFMetaData>emptyList())); + } + + return keyspacesToDrop; + } + + private static void mergeColumnFamilies(Map<DecoratedKey, ColumnFamily> old, Map<DecoratedKey, ColumnFamily> updated) + throws ConfigurationException, IOException { - Set<File> found = new HashSet<File>(); - for (String path : DatabaseDescriptor.getAllDataFileLocationsForTable(table)) + // calculate the difference between old and new states (note that entriesOnlyLeft() will be always empty) + MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(old, updated); + + // check if any new Keyspaces with ColumnFamilies were added. + for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) { - File[] dbFiles = new File(path).listFiles(new FileFilter() + ColumnFamily cfAttrs = entry.getValue(); + + if (!cfAttrs.isEmpty()) { - public boolean accept(File pathname) - { - return pathname.getName().startsWith(cf + "-") && pathname.getName().endsWith(".db") && pathname.exists(); - } - }); - found.addAll(Arrays.asList(dbFiles)); + Map<String, CFMetaData> cfDefs = KSMetaData.deserializeColumnFamilies(new Row(entry.getKey(), cfAttrs)); + + for (CFMetaData cfDef : cfDefs.values()) + addColumnFamily(cfDef); + } + } + + // deal with modified ColumnFamilies (remember that all of the keyspace nested ColumnFamilies are put to the single row) + Map<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> modifiedEntries = diff.entriesDiffering(); + + for (DecoratedKey keyspace : modifiedEntries.keySet()) + { + MapDifference.ValueDifference<ColumnFamily> valueDiff = modifiedEntries.get(keyspace); + + ColumnFamily prevValue = valueDiff.leftValue(); // state before external modification + ColumnFamily newValue = valueDiff.rightValue(); // updated state + + Row newRow = new Row(keyspace, newValue); + + if (prevValue.isEmpty()) // whole keyspace was deleted and now it's re-created + { + for (CFMetaData cfm : KSMetaData.deserializeColumnFamilies(newRow).values()) + addColumnFamily(cfm); + } + else if (newValue.isEmpty()) // whole keyspace is deleted + { + for (CFMetaData cfm : KSMetaData.deserializeColumnFamilies(new Row(keyspace, prevValue)).values()) + dropColumnFamily(cfm.ksName, cfm.cfName); + } + else // has modifications in the nested ColumnFamilies, need to perform nested diff to determine what was really changed + { + String ksName = AsciiType.instance.getString(keyspace.key); + + Map<String, CFMetaData> oldCfDefs = new HashMap<String, CFMetaData>(); + for (CFMetaData cfm : Schema.instance.getKSMetaData(ksName).cfMetaData().values()) + oldCfDefs.put(cfm.cfName, cfm); + + Map<String, CFMetaData> newCfDefs = KSMetaData.deserializeColumnFamilies(newRow); + + MapDifference<String, CFMetaData> cfDefDiff = Maps.difference(oldCfDefs, newCfDefs); + + for (CFMetaData cfDef : cfDefDiff.entriesOnlyOnRight().values()) + addColumnFamily(cfDef); + + for (CFMetaData cfDef : cfDefDiff.entriesOnlyOnLeft().values()) + dropColumnFamily(cfDef.ksName, cfDef.cfName); + + for (MapDifference.ValueDifference<CFMetaData> cfDef : cfDefDiff.entriesDiffering().values()) + updateColumnFamily(cfDef.rightValue()); + } } - return found; + } + + private static void addKeyspace(KSMetaData ksm) + { + assert Schema.instance.getKSMetaData(ksm.name) == null; + Schema.instance.load(ksm); + + if (!StorageService.instance.isClientMode()) + Table.open(ksm.name); + } + + private static void addColumnFamily(CFMetaData cfm) throws IOException + { + assert Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName) == null; + KSMetaData ksm = Schema.instance.getTableDefinition(cfm.ksName); + ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm))); + + Schema.instance.load(cfm); + + // make sure it's init-ed w/ the old definitions first, + // since we're going to call initCf on the new one manually + Table.open(cfm.ksName); + + Schema.instance.setTableDefinition(ksm); + + if (!StorageService.instance.isClientMode()) + Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName); + } + + private static void updateKeyspace(KSMetaData newState) throws IOException + { + KSMetaData oldKsm = Schema.instance.getKSMetaData(newState.name); + assert oldKsm != null; + KSMetaData newKsm = KSMetaData.cloneWith(oldKsm.reloadAttributes(), oldKsm.cfMetaData().values()); + + Schema.instance.setTableDefinition(newKsm); + + try + { + if (!StorageService.instance.isClientMode()) + Table.open(newState.name).createReplicationStrategy(newKsm); + } + catch (ConfigurationException e) + { + // It's too late to throw a configuration exception, we should have catch those previously + throw new RuntimeException(e); + } + } + + private static void updateColumnFamily(CFMetaData newState) throws IOException + { + CFMetaData cfm = Schema.instance.getCFMetaData(newState.ksName, newState.cfName); + assert cfm != null; + cfm.reload(); + + if (!StorageService.instance.isClientMode()) + { + Table table = Table.open(cfm.ksName); + table.getColumnFamilyStore(cfm.cfName).reload(); + } + } + + private static void dropKeyspace(String ksName) throws IOException + { + KSMetaData ksm = Schema.instance.getTableDefinition(ksName); + String snapshotName = Table.getTimestampedSnapshotName(ksName); + + // remove all cfs from the table instance. + for (CFMetaData cfm : ksm.cfMetaData().values()) + { + ColumnFamilyStore cfs = Table.open(ksm.name).getColumnFamilyStore(cfm.cfName); + + Schema.instance.purge(cfm); + + if (!StorageService.instance.isClientMode()) + { - cfs.snapshot(snapshotName); ++ if (DatabaseDescriptor.isAutoSnapshot()) ++ cfs.snapshot(snapshotName); + Table.open(ksm.name).dropCf(cfm.cfId); + } + } + + // remove the table from the static instances. + Table.clear(ksm.name); + Schema.instance.clearTableDefinition(ksm); + } + + private static void dropColumnFamily(String ksName, String cfName) throws IOException + { + KSMetaData ksm = Schema.instance.getTableDefinition(ksName); + assert ksm != null; + ColumnFamilyStore cfs = Table.open(ksName).getColumnFamilyStore(cfName); + assert cfs != null; + + // reinitialize the table. + CFMetaData cfm = ksm.cfMetaData().get(cfName); + + Schema.instance.purge(cfm); + Schema.instance.setTableDefinition(makeNewKeyspaceDefinition(ksm, cfm)); + + if (!StorageService.instance.isClientMode()) + { - cfs.snapshot(Table.getTimestampedSnapshotName(cfs.columnFamily)); ++ if (DatabaseDescriptor.isAutoSnapshot()) ++ cfs.snapshot(Table.getTimestampedSnapshotName(cfs.columnFamily)); + Table.open(ksm.name).dropCf(cfm.cfId); + } + } + + private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude) + { + // clone ksm but do not include the new def + List<CFMetaData> newCfs = new ArrayList<CFMetaData>(ksm.cfMetaData().values()); + newCfs.remove(toExclude); + assert newCfs.size() == ksm.cfMetaData().size() - 1; + return KSMetaData.cloneWith(ksm, newCfs); + } + + private static void flushSchemaCFs() + { + flushSchemaCF(SystemTable.SCHEMA_KEYSPACES_CF); + flushSchemaCF(SystemTable.SCHEMA_COLUMNFAMILIES_CF); + flushSchemaCF(SystemTable.SCHEMA_COLUMNS_CF); + } + + private static void flushSchemaCF(String cfName) + { + Future<?> flush = SystemTable.schemaCFS(cfName).forceFlush(); + + if (flush != null) + FBUtilities.waitOnFuture(flush); + } + + private static ByteBuffer toUTF8Bytes(UUID version) + { + return ByteBufferUtil.bytes(version.toString()); + } + + /** + * Deserialize a single object based on the given Schema. + * + * @param writer writer's schema + * @param bytes Array to deserialize from + * @param ob An empty object to deserialize into (must not be null). + * + * @return serialized Avro object + * + * @throws IOException if deserialization failed + */ + public static <T extends SpecificRecord> T deserializeAvro(org.apache.avro.Schema writer, ByteBuffer bytes, T ob) throws IOException + { + BinaryDecoder dec = DIRECT_DECODERS.createBinaryDecoder(ByteBufferUtil.getArray(bytes), null); + SpecificDatumReader<T> reader = new SpecificDatumReader<T>(writer); + reader.setExpected(ob.getSchema()); + return reader.read(ob, dec); } }