Author: gdusbabek
Date: Wed Jul 28 18:00:18 2010
New Revision: 980141

URL: http://svn.apache.org/viewvc?rev=980141&view=rev
Log:
run thrift and jmx migrations on migration stage. patch by gdusbabek, reviewed 
by stuhood. CASSANDRA-1292

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=980141&r1=980140&r2=980141&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Wed Jul 28 18:00:18 2010
@@ -1660,15 +1660,42 @@ public class StorageService implements I
      */
     public void loadSchemaFromYAML() throws ConfigurationException, IOException
     { 
-        // blow up if there is a schema saved.
-        if (DatabaseDescriptor.getDefsVersion().timestamp() > 0 || 
Migration.getLastMigrationId() != null)
-            throw new ConfigurationException("Cannot load from XML on top of 
pre-existing schemas.");
-        
+        Callable<Migration> call = new Callable<Migration>()
+        {
+            public Migration call() throws Exception
+            {
+                // blow up if there is a schema saved.
+                if (DatabaseDescriptor.getDefsVersion().timestamp() > 0 || 
Migration.getLastMigrationId() != null)
+                    throw new ConfigurationException("Cannot load from XML on 
top of pre-existing schemas.");
+                
+                Migration migration = null;
+                for (KSMetaData table : 
DatabaseDescriptor.readTablesFromYaml())
+                {
+                    migration = new AddKeyspace(table); 
+                    migration.apply();
+                }
+                return migration;
+            }
+        };
         Migration migration = null;
-        for (KSMetaData table : DatabaseDescriptor.readTablesFromYaml())
+        try
+        {
+            migration = 
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(call).get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (ExecutionException e)
         {
-            migration = new AddKeyspace(table); 
-            migration.apply();
+            if (e.getCause() instanceof ConfigurationException)
+                throw (ConfigurationException)e.getCause();
+            else if (e.getCause() instanceof IOException)
+                throw (IOException)e.getCause();
+            else if (e.getCause() instanceof Exception)
+                throw new ConfigurationException(e.getCause().getMessage(), 
(Exception)e.getCause());
+            else
+                throw new RuntimeException(e);
         }
         
         assert DatabaseDescriptor.getDefsVersion().timestamp() > 0;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=980141&r1=980140&r2=980141&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
Wed Jul 28 18:00:18 2010
@@ -20,8 +20,12 @@ package org.apache.cassandra.thrift;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.cassandra.db.migration.Migration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -750,20 +754,52 @@ public class CassandraServer implements 
     {
         requestScheduler.release();
     }
+    
+    // helper method to apply migration on the migration stage. typical 
migration failures will throw an 
+    // InvalidRequestException. atypical failures will throw a 
RuntimeException.
+    private static void applyMigrationOnStage(final Migration m) throws 
InvalidRequestException
+    {
+        Future f = 
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Callable()
+        {
+            public Object call() throws Exception
+            {
+                m.apply();
+                m.announce();
+                return null;
+            }
+        });
+        try
+        {
+            f.get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (ExecutionException e)
+        {
+            // this means call() threw an exception. deal with it directly.
+            if (e.getCause() != null)
+            {
+                InvalidRequestException ex = new 
InvalidRequestException(e.getCause().getMessage());
+                ex.initCause(e.getCause());
+                throw ex;
+            }
+            else
+            {
+                InvalidRequestException ex = new 
InvalidRequestException(e.getMessage());
+                ex.initCause(e);
+                throw ex;
+            }
+        }
+    }
 
     public String system_add_column_family(CfDef cf_def) throws 
InvalidRequestException, TException
     {
         checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
-
-        // if there is anything going on in the migration stage, fail.
-        if 
(StageManager.getStage(StageManager.MIGRATION_STAGE).getQueue().size() > 0)
-            throw new InvalidRequestException("This node appears to be 
handling gossiped migrations.");
-        
         try
         {
-            AddColumnFamily add = new 
AddColumnFamily(convertToCFMetaData(cf_def));
-            add.apply();
-            add.announce();
+            applyMigrationOnStage(new 
AddColumnFamily(convertToCFMetaData(cf_def)));
             return DatabaseDescriptor.getDefsVersion().toString();
         }
         catch (ConfigurationException e)
@@ -784,15 +820,9 @@ public class CassandraServer implements 
     {
         checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
         
-        // if there is anything going on in the migration stage, fail.
-        if 
(StageManager.getStage(StageManager.MIGRATION_STAGE).getQueue().size() > 0)
-            throw new InvalidRequestException("This node appears to be 
handling gossiped migrations.");
-
         try
         {
-            DropColumnFamily drop = new DropColumnFamily(keySpace.get(), 
column_family, true);
-            drop.apply();
-            drop.announce();
+            applyMigrationOnStage(new DropColumnFamily(keySpace.get(), 
column_family, true));
             return DatabaseDescriptor.getDefsVersion().toString();
         }
         catch (ConfigurationException e)
@@ -813,15 +843,9 @@ public class CassandraServer implements 
     {
         checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
         
-        // if there is anything going on in the migration stage, fail.
-        if 
(StageManager.getStage(StageManager.MIGRATION_STAGE).getQueue().size() > 0)
-            throw new InvalidRequestException("This node appears to be 
handling gossiped migrations.");
-
         try
         {
-            RenameColumnFamily rename = new RenameColumnFamily(keySpace.get(), 
old_name, new_name);
-            rename.apply();
-            rename.announce();
+            applyMigrationOnStage(new RenameColumnFamily(keySpace.get(), 
old_name, new_name));
             return DatabaseDescriptor.getDefsVersion().toString();
         }
         catch (ConfigurationException e)
@@ -846,10 +870,6 @@ public class CassandraServer implements 
         if (!(DatabaseDescriptor.getAuthenticator() instanceof 
AllowAllAuthenticator))
             throw new InvalidRequestException("Unable to create new keyspace 
while authentication is enabled.");
 
-        // if there is anything going on in the migration stage, fail.
-        if 
(StageManager.getStage(StageManager.MIGRATION_STAGE).getQueue().size() > 0)
-            throw new InvalidRequestException("This node appears to be 
handling gossiped migrations.");
-
         try
         {
             Collection<CFMetaData> cfDefs = new 
ArrayList<CFMetaData>(ks_def.cf_defs.size());
@@ -863,9 +883,7 @@ public class CassandraServer implements 
                     (Class<? extends 
AbstractReplicationStrategy>)Class.forName(ks_def.strategy_class), 
                     ks_def.replication_factor, 
                     cfDefs.toArray(new CFMetaData[cfDefs.size()]));
-            AddKeyspace add = new AddKeyspace(ksm);
-            add.apply();
-            add.announce();
+            applyMigrationOnStage(new AddKeyspace(ksm));
             return DatabaseDescriptor.getDefsVersion().toString();
         }
         catch (ClassNotFoundException e)
@@ -887,20 +905,14 @@ public class CassandraServer implements 
             throw ex;
         }
     }
-
+    
     public String system_drop_keyspace(String keyspace) throws 
InvalidRequestException, TException
     {
         checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
         
-        // if there is anything going on in the migration stage, fail.
-        if 
(StageManager.getStage(StageManager.MIGRATION_STAGE).getQueue().size() > 0)
-            throw new InvalidRequestException("This node appears to be 
handling gossiped migrations.");
-
         try
         {
-            DropKeyspace drop = new DropKeyspace(keyspace, true);
-            drop.apply();
-            drop.announce();
+            applyMigrationOnStage(new DropKeyspace(keyspace, true));
             return DatabaseDescriptor.getDefsVersion().toString();
         }
         catch (ConfigurationException e)
@@ -921,10 +933,6 @@ public class CassandraServer implements 
     {
         checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
         
-        // if there is anything going on in the migration stage, fail.
-        if 
(StageManager.getStage(StageManager.MIGRATION_STAGE).getQueue().size() > 0)
-            throw new InvalidRequestException("This node appears to be 
handling gossiped migrations.");
-
         try
         {
             RenameKeyspace rename = new RenameKeyspace(old_name, new_name);


Reply via email to