Author: gdusbabek Date: Fri May 7 22:07:00 2010 New Revision: 942248 URL: http://svn.apache.org/viewvc?rev=942248&view=rev Log: move ks/cf definitions into system keyspace. Patch by Gary Dusbabek, reviewed by Jonathan Ellis. CASSANDRA-1052
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java cassandra/trunk/src/java/org/apache/cassandra/db/Table.java cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/test/system/test_thrift_server.py cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=942248&r1=942247&r2=942248&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri May 7 22:07:00 2010 @@ -296,22 +296,17 @@ public class DatabaseDescriptor "hinted handoff data", 0, false, - 0.01) + 0.01), + new CFMetaData(Table.SYSTEM_TABLE, Migration.MIGRATIONS_CF, ColumnFamilyType.Standard, new TimeUUIDType(), null, "individual schema mutations", 0, false, 0), + new CFMetaData(Table.SYSTEM_TABLE, Migration.SCHEMA_CF, ColumnFamilyType.Standard, new UTF8Type(), null, "current state of the schema", 0, false, 0) }; KSMetaData systemMeta = new KSMetaData(Table.SYSTEM_TABLE, null, -1, systemCfDefs); CFMetaData.map(systemCfDefs[0]); CFMetaData.map(systemCfDefs[1]); + CFMetaData.map(systemCfDefs[2]); + CFMetaData.map(systemCfDefs[3]); tables.put(Table.SYSTEM_TABLE, systemMeta); - CFMetaData[] definitionCfDefs = new CFMetaData[] - { - new CFMetaData(Table.DEFINITIONS, Migration.MIGRATIONS_CF, ColumnFamilyType.Standard, new TimeUUIDType(), null, "individual schema mutations", 0, false, 0), - new CFMetaData(Table.DEFINITIONS, Migration.SCHEMA_CF, ColumnFamilyType.Standard, new UTF8Type(), null, "current state of the schema", 0, false, 0) - }; - CFMetaData.map(definitionCfDefs[0]); - CFMetaData.map(definitionCfDefs[1]); - tables.put(Table.DEFINITIONS, new KSMetaData(Table.DEFINITIONS, null, -1, definitionCfDefs)); - // NOTE: make sure that all system CFMs defined by now. calling fixMaxId at this point will set the base id // to a value that leaves room for future system cfms. // TODO: I've left quite a bit of space for more system CFMs to be defined (up to 1000). However, there is no @@ -705,7 +700,6 @@ public class DatabaseDescriptor { List<String> tableslist = new ArrayList<String>(tables.keySet()); tableslist.remove(Table.SYSTEM_TABLE); - tableslist.remove(Table.DEFINITIONS); return Collections.unmodifiableList(tableslist); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=942248&r1=942247&r2=942248&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri May 7 22:07:00 2010 @@ -370,6 +370,7 @@ public class ColumnFamilyStore implement // if we're not writing to the commit log, we are replaying the log, so marking // the log header with "you can discard anything written before the context" is not valid final int cfId = DatabaseDescriptor.getTableMetaData(table_).get(columnFamily_).cfId; + logger_.info("Discarding " + cfId); CommitLog.instance().discardCompletedSegments(cfId, ctx); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java?rev=942248&r1=942247&r2=942248&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/DefsTable.java Fri May 7 22:07:00 2010 @@ -47,7 +47,7 @@ public class DefsTable { byte[] versionKey = Migration.toBytes(version); long now = System.currentTimeMillis(); - RowMutation rm = new RowMutation(Table.DEFINITIONS, versionKey); + RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, versionKey); for (String tableName : DatabaseDescriptor.getNonSystemTables()) { KSMetaData ks = DatabaseDescriptor.getTableDefinition(tableName); @@ -55,7 +55,7 @@ public class DefsTable } rm.apply(); - rm = new RowMutation(Table.DEFINITIONS, Migration.LAST_MIGRATION_KEY); + rm = new RowMutation(Table.SYSTEM_TABLE, Migration.LAST_MIGRATION_KEY); rm.add(new QueryPath(Migration.SCHEMA_CF, null, Migration.LAST_MIGRATION_KEY), UUIDGen.decompose(version), now); rm.apply(); } @@ -64,7 +64,7 @@ public class DefsTable public static synchronized Collection<KSMetaData> loadFromStorage(UUID version) throws IOException { DecoratedKey vkey = StorageService.getPartitioner().decorateKey(Migration.toBytes(version)); - Table defs = Table.open(Table.DEFINITIONS); + Table defs = Table.open(Table.SYSTEM_TABLE); ColumnFamilyStore cfStore = defs.getColumnFamilyStore(Migration.SCHEMA_CF); QueryFilter filter = QueryFilter.getSliceFilter(vkey, new QueryPath(Migration.SCHEMA_CF), "".getBytes(), "".getBytes(), null, false, 1024); ColumnFamily cf = cfStore.getColumnFamily(filter); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=942248&r1=942247&r2=942248&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri May 7 22:07:00 2010 @@ -50,7 +50,6 @@ import org.slf4j.LoggerFactory; public class Table { public static final String SYSTEM_TABLE = "system"; - public static final String DEFINITIONS = "definitions"; private static final Logger logger = LoggerFactory.getLogger(Table.class); private static final String SNAPSHOT_SUBDIR_NAME = "snapshots"; Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=942248&r1=942247&r2=942248&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Fri May 7 22:07:00 2010 @@ -46,7 +46,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -113,28 +115,35 @@ public abstract class Migration { long now = System.currentTimeMillis(); byte[] buf = getBytes(); - RowMutation migration = new RowMutation(Table.DEFINITIONS, MIGRATIONS_KEY); + RowMutation migration = new RowMutation(Table.SYSTEM_TABLE, MIGRATIONS_KEY); migration.add(new QueryPath(MIGRATIONS_CF, null, UUIDGen.decompose(newVersion)), buf, now); migration.apply(); // note that we storing this in the system table, which is not replicated, instead of the definitions table, which is. logger.debug("Applying migration " + newVersion.toString()); - migration = new RowMutation(Table.DEFINITIONS, LAST_MIGRATION_KEY); + migration = new RowMutation(Table.SYSTEM_TABLE, LAST_MIGRATION_KEY); migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY), UUIDGen.decompose(newVersion), now); migration.apply(); // flush changes out of memtables so we don't need to rely on the commit log. - for (Future f : Table.open(Table.DEFINITIONS).flush()) + ColumnFamilyStore[] schemaStores = new ColumnFamilyStore[] { + Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(Migration.MIGRATIONS_CF), + Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(Migration.SCHEMA_CF) + }; + List<Future> flushes = new ArrayList<Future>(); + for (ColumnFamilyStore cfs : schemaStores) + flushes.add(cfs.forceFlush()); + for (Future f : flushes) { try { f.get(); } - catch (InterruptedException e) + catch (ExecutionException e) { throw new IOException(e); } - catch (ExecutionException e) + catch (InterruptedException e) { throw new IOException(e); } @@ -158,7 +167,7 @@ public abstract class Migration public static UUID getLastMigrationId() { DecoratedKey dkey = StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY); - Table defs = Table.open(Table.DEFINITIONS); + Table defs = Table.open(Table.SYSTEM_TABLE); ColumnFamilyStore cfStore = defs.getColumnFamilyStore(SCHEMA_CF); QueryFilter filter = QueryFilter.getNamesFilter(dkey, new QueryPath(SCHEMA_CF), LAST_MIGRATION_KEY); ColumnFamily cf = cfStore.getColumnFamily(filter); @@ -192,7 +201,7 @@ public abstract class Migration static RowMutation makeDefinitionMutation(KSMetaData add, KSMetaData remove, UUID versionId) throws IOException { final long now = System.currentTimeMillis(); - RowMutation rm = new RowMutation(Table.DEFINITIONS, toBytes(versionId)); + RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, toBytes(versionId)); if (remove != null) rm.delete(new QueryPath(SCHEMA_CF, null, remove.name.getBytes()), System.currentTimeMillis()); if (add != null) @@ -253,7 +262,7 @@ public abstract class Migration public static Collection<IColumn> getLocalMigrations(UUID start, UUID end) { DecoratedKey dkey = StorageService.getPartitioner().decorateKey(MIGRATIONS_KEY); - Table defs = Table.open(Table.DEFINITIONS); + Table defs = Table.open(Table.SYSTEM_TABLE); ColumnFamilyStore cfStore = defs.getColumnFamilyStore(Migration.MIGRATIONS_CF); QueryFilter filter = QueryFilter.getSliceFilter(dkey, new QueryPath(MIGRATIONS_CF), UUIDGen.decompose(start), UUIDGen.decompose(end), null, false, 1000); ColumnFamily cf = cfStore.getColumnFamily(filter); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=942248&r1=942247&r2=942248&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Fri May 7 22:07:00 2010 @@ -293,7 +293,7 @@ public class AntiEntropyService */ public IValidator getValidator(String table, String cf, InetAddress initiator, boolean major) { - if (!major || table.equals(Table.SYSTEM_TABLE) || table.equals(Table.DEFINITIONS)) + if (!major || table.equals(Table.SYSTEM_TABLE)) return new NoopValidator(); if (StorageService.instance.getTokenMetadata().sortedTokens().size() < 1) // gossiper isn't started Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=942248&r1=942247&r2=942248&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri May 7 22:07:00 2010 @@ -1617,7 +1617,6 @@ public class StorageService implements I // flush system and definition tables. Collection<Future> flushers = new ArrayList<Future>(); flushers.addAll(Table.open(Table.SYSTEM_TABLE).flush()); - flushers.addAll(Table.open(Table.DEFINITIONS).flush()); for (Future f : flushers) { try Modified: cassandra/trunk/test/system/test_thrift_server.py URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=942248&r1=942247&r2=942248&view=diff ============================================================================== --- cassandra/trunk/test/system/test_thrift_server.py (original) +++ cassandra/trunk/test/system/test_thrift_server.py Fri May 7 22:07:00 2010 @@ -1046,7 +1046,7 @@ class TestMutations(ThriftTester): def test_describe_keyspace(self): """ Test keyspace description """ kspaces = client.describe_keyspaces() - assert len(kspaces) == 6, kspaces # ['system', 'Keyspace2', 'Keyspace3', 'Keyspace1', 'Keyspace4', 'definitions'] + assert len(kspaces) == 5, kspaces # ['system', 'Keyspace2', 'Keyspace3', 'Keyspace1', 'Keyspace4'] ks1 = client.describe_keyspace("Keyspace1") assert set(ks1.keys()) == set(['Super1', 'Standard1', 'Standard2', 'StandardLong1', 'StandardLong2', 'Super3', 'Super2', 'Super4']) sysks = client.describe_keyspace("system") Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=942248&r1=942247&r2=942248&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Fri May 7 22:07:00 2010 @@ -26,6 +26,7 @@ import org.apache.cassandra.config.Confi import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.CommitLogSegment; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.marshal.BytesType; @@ -41,6 +42,7 @@ import org.apache.cassandra.utils.FBUtil import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.utils.UUIDGen; +import org.junit.Before; import org.junit.Test; import java.io.ByteArrayInputStream; @@ -57,8 +59,7 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; public class DefsTest extends CleanupHelper -{ - +{ @Test public void saveAndRestore() throws IOException {