Author: gdusbabek Date: Fri May 21 20:29:54 2010 New Revision: 947163 URL: http://svn.apache.org/viewvc?rev=947163&view=rev Log: recover when a migration crashes before system table is flushed. patch by gdusbabek, reviewed by stuhood. CASSANDRA-987
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java?rev=947163&r1=947162&r2=947163&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java Fri May 21 20:29:54 2010 @@ -22,18 +22,28 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.avro.ipc.SocketServer; import org.apache.avro.ipc.HttpServer; import org.apache.avro.specific.SpecificResponder; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.CompactionManager; +import org.apache.cassandra.db.DefsTable; import org.apache.cassandra.db.Table; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.migration.Migration; +import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Mx4jTool; +import org.apache.cassandra.utils.WrappedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,6 +104,16 @@ public class CassandraDaemon { // replay the log if necessary and check for compaction candidates CommitLog.recover(); CompactionManager.instance.checkAllColumnFamilies(); + + // check to see if CL.recovery modified the lastMigrationId. if it did, we need to re apply migrations. this isn't + // the same as merely reloading the schema (which wouldn't perform file deletion after a DROP). The solution + // is to read those migrations from disk and apply them. + UUID currentMigration = DatabaseDescriptor.getDefsVersion(); + UUID lastMigration = Migration.getLastMigrationId(); + if (lastMigration.timestamp() > currentMigration.timestamp()) + { + MigrationManager.applyMigrations(currentMigration, lastMigration); + } // start server internals StorageService.instance.initServer(); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=947163&r1=947162&r2=947163&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Fri May 21 20:29:54 2010 @@ -18,6 +18,7 @@ package org.apache.cassandra.db.migration; +import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.ColumnFamily; @@ -98,11 +99,11 @@ public abstract class Migration public void beforeApplyModels() {} /** apply changes */ - public final void apply() throws IOException + public final void apply() throws IOException, ConfigurationException { // ensure migration is serial. don't apply unless the previous version matches. if (!DatabaseDescriptor.getDefsVersion().equals(lastVersion)) - throw new IOException("Previous version mismatch. cannot apply."); + throw new ConfigurationException("Previous version mismatch. cannot apply."); // write to schema assert rm != null; if (!clientMode) @@ -124,6 +125,10 @@ public abstract class Migration migration = new RowMutation(Table.SYSTEM_TABLE, LAST_MIGRATION_KEY); migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY), UUIDGen.decompose(newVersion), now); migration.apply(); + + // if we fail here, there will be schema changes in the CL that will get replayed *AFTER* the schema is loaded. + // CassandraDaemon checks for this condition (the stored version will be greater than the loaded version) + // and calls MigrationManager.applyMigrations(loaded version, stored version). // flush changes out of memtables so we don't need to rely on the commit log. ColumnFamilyStore[] schemaStores = new ColumnFamilyStore[] { Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=947163&r1=947162&r2=947163&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Fri May 21 20:29:54 2010 @@ -19,6 +19,7 @@ package org.apache.cassandra.service; import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.IColumn; @@ -42,8 +43,11 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; public class MigrationManager implements IEndpointStateChangeSubscriber { @@ -99,6 +103,59 @@ public class MigrationManager implements for (InetAddress host : hosts) MessagingService.instance.sendOneWay(msg, host); } + + /** + * gets called during startup if we notice a mismatch between the current migration version and the one saved. This + * can only happen as a result of the commit log recovering schema updates, which overwrites lastVersionId. + * + * This method silently eats IOExceptions thrown by Migration.apply() as a result of applying a migration out of + * order. + */ + public static void applyMigrations(UUID from, UUID to) throws IOException + { + List<Future> updates = new ArrayList<Future>(); + Collection<IColumn> migrations = Migration.getLocalMigrations(from, to); + for (IColumn col : migrations) + { + final Migration migration = Migration.deserialize(new ByteArrayInputStream(col.value())); + Future update = StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Runnable() + { + public void run() + { + try + { + migration.apply(); + } + catch (ConfigurationException ex) + { + // this happens if we try to apply something that's already been applied. ignore and proceed. + } + catch (IOException ex) + { + throw new RuntimeException(ex); + } + } + }); + updates.add(update); + } + + // wait on all the updates before proceeding. + for (Future f : updates) + { + try + { + f.get(); + } + catch (InterruptedException e) + { + throw new IOException(e); + } + catch (ExecutionException e) + { + throw new IOException(e); + } + } + } /** pushes migrations from this host to another host */ public static void pushMigrations(UUID from, UUID to, InetAddress host)