Author: gdusbabek Date: Wed Jul 28 18:00:18 2010 New Revision: 980141 URL: http://svn.apache.org/viewvc?rev=980141&view=rev Log: run thrift and jmx migrations on migration stage. patch by gdusbabek, reviewed by stuhood. CASSANDRA-1292
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=980141&r1=980140&r2=980141&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Jul 28 18:00:18 2010 @@ -1660,15 +1660,42 @@ public class StorageService implements I */ public void loadSchemaFromYAML() throws ConfigurationException, IOException { - // blow up if there is a schema saved. - if (DatabaseDescriptor.getDefsVersion().timestamp() > 0 || Migration.getLastMigrationId() != null) - throw new ConfigurationException("Cannot load from XML on top of pre-existing schemas."); - + Callable<Migration> call = new Callable<Migration>() + { + public Migration call() throws Exception + { + // blow up if there is a schema saved. + if (DatabaseDescriptor.getDefsVersion().timestamp() > 0 || Migration.getLastMigrationId() != null) + throw new ConfigurationException("Cannot load from XML on top of pre-existing schemas."); + + Migration migration = null; + for (KSMetaData table : DatabaseDescriptor.readTablesFromYaml()) + { + migration = new AddKeyspace(table); + migration.apply(); + } + return migration; + } + }; Migration migration = null; - for (KSMetaData table : DatabaseDescriptor.readTablesFromYaml()) + try + { + migration = StageManager.getStage(StageManager.MIGRATION_STAGE).submit(call).get(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + catch (ExecutionException e) { - migration = new AddKeyspace(table); - migration.apply(); + if (e.getCause() instanceof ConfigurationException) + throw (ConfigurationException)e.getCause(); + else if (e.getCause() instanceof IOException) + throw (IOException)e.getCause(); + else if (e.getCause() instanceof Exception) + throw new ConfigurationException(e.getCause().getMessage(), (Exception)e.getCause()); + else + throw new RuntimeException(e); } assert DatabaseDescriptor.getDefsVersion().timestamp() > 0; Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=980141&r1=980140&r2=980141&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Wed Jul 28 18:00:18 2010 @@ -20,8 +20,12 @@ package org.apache.cassandra.thrift; import java.io.IOException; import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; +import org.apache.cassandra.db.migration.Migration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -750,20 +754,52 @@ public class CassandraServer implements { requestScheduler.release(); } + + // helper method to apply migration on the migration stage. typical migration failures will throw an + // InvalidRequestException. atypical failures will throw a RuntimeException. + private static void applyMigrationOnStage(final Migration m) throws InvalidRequestException + { + Future f = StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Callable() + { + public Object call() throws Exception + { + m.apply(); + m.announce(); + return null; + } + }); + try + { + f.get(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + catch (ExecutionException e) + { + // this means call() threw an exception. deal with it directly. + if (e.getCause() != null) + { + InvalidRequestException ex = new InvalidRequestException(e.getCause().getMessage()); + ex.initCause(e.getCause()); + throw ex; + } + else + { + InvalidRequestException ex = new InvalidRequestException(e.getMessage()); + ex.initCause(e); + throw ex; + } + } + } public String system_add_column_family(CfDef cf_def) throws InvalidRequestException, TException { checkKeyspaceAndLoginAuthorized(AccessLevel.FULL); - - // if there is anything going on in the migration stage, fail. - if (StageManager.getStage(StageManager.MIGRATION_STAGE).getQueue().size() > 0) - throw new InvalidRequestException("This node appears to be handling gossiped migrations."); - try { - AddColumnFamily add = new AddColumnFamily(convertToCFMetaData(cf_def)); - add.apply(); - add.announce(); + applyMigrationOnStage(new AddColumnFamily(convertToCFMetaData(cf_def))); return DatabaseDescriptor.getDefsVersion().toString(); } catch (ConfigurationException e) @@ -784,15 +820,9 @@ public class CassandraServer implements { checkKeyspaceAndLoginAuthorized(AccessLevel.FULL); - // if there is anything going on in the migration stage, fail. - if (StageManager.getStage(StageManager.MIGRATION_STAGE).getQueue().size() > 0) - throw new InvalidRequestException("This node appears to be handling gossiped migrations."); - try { - DropColumnFamily drop = new DropColumnFamily(keySpace.get(), column_family, true); - drop.apply(); - drop.announce(); + applyMigrationOnStage(new DropColumnFamily(keySpace.get(), column_family, true)); return DatabaseDescriptor.getDefsVersion().toString(); } catch (ConfigurationException e) @@ -813,15 +843,9 @@ public class CassandraServer implements { checkKeyspaceAndLoginAuthorized(AccessLevel.FULL); - // if there is anything going on in the migration stage, fail. - if (StageManager.getStage(StageManager.MIGRATION_STAGE).getQueue().size() > 0) - throw new InvalidRequestException("This node appears to be handling gossiped migrations."); - try { - RenameColumnFamily rename = new RenameColumnFamily(keySpace.get(), old_name, new_name); - rename.apply(); - rename.announce(); + applyMigrationOnStage(new RenameColumnFamily(keySpace.get(), old_name, new_name)); return DatabaseDescriptor.getDefsVersion().toString(); } catch (ConfigurationException e) @@ -846,10 +870,6 @@ public class CassandraServer implements if (!(DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator)) throw new InvalidRequestException("Unable to create new keyspace while authentication is enabled."); - // if there is anything going on in the migration stage, fail. - if (StageManager.getStage(StageManager.MIGRATION_STAGE).getQueue().size() > 0) - throw new InvalidRequestException("This node appears to be handling gossiped migrations."); - try { Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>(ks_def.cf_defs.size()); @@ -863,9 +883,7 @@ public class CassandraServer implements (Class<? extends AbstractReplicationStrategy>)Class.forName(ks_def.strategy_class), ks_def.replication_factor, cfDefs.toArray(new CFMetaData[cfDefs.size()])); - AddKeyspace add = new AddKeyspace(ksm); - add.apply(); - add.announce(); + applyMigrationOnStage(new AddKeyspace(ksm)); return DatabaseDescriptor.getDefsVersion().toString(); } catch (ClassNotFoundException e) @@ -887,20 +905,14 @@ public class CassandraServer implements throw ex; } } - + public String system_drop_keyspace(String keyspace) throws InvalidRequestException, TException { checkKeyspaceAndLoginAuthorized(AccessLevel.FULL); - // if there is anything going on in the migration stage, fail. - if (StageManager.getStage(StageManager.MIGRATION_STAGE).getQueue().size() > 0) - throw new InvalidRequestException("This node appears to be handling gossiped migrations."); - try { - DropKeyspace drop = new DropKeyspace(keyspace, true); - drop.apply(); - drop.announce(); + applyMigrationOnStage(new DropKeyspace(keyspace, true)); return DatabaseDescriptor.getDefsVersion().toString(); } catch (ConfigurationException e) @@ -921,10 +933,6 @@ public class CassandraServer implements { checkKeyspaceAndLoginAuthorized(AccessLevel.FULL); - // if there is anything going on in the migration stage, fail. - if (StageManager.getStage(StageManager.MIGRATION_STAGE).getQueue().size() > 0) - throw new InvalidRequestException("This node appears to be handling gossiped migrations."); - try { RenameKeyspace rename = new RenameKeyspace(old_name, new_name);