Author: gdusbabek
Date: Fri May 21 20:29:54 2010
New Revision: 947163

URL: http://svn.apache.org/viewvc?rev=947163&view=rev
Log:
recover when a migration crashes before system table is flushed. patch by 
gdusbabek, reviewed by stuhood. CASSANDRA-987

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
    cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java?rev=947163&r1=947162&r2=947163&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java Fri 
May 21 20:29:54 2010
@@ -22,18 +22,28 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.apache.avro.ipc.SocketServer;
 import org.apache.avro.ipc.HttpServer;
 import org.apache.avro.specific.SpecificResponder;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.DefsTable;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.migration.Migration;
+import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Mx4jTool;
 
+import org.apache.cassandra.utils.WrappedRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,6 +104,16 @@ public class CassandraDaemon {
         // replay the log if necessary and check for compaction candidates
         CommitLog.recover();
         CompactionManager.instance.checkAllColumnFamilies();
+        
+        // check to see if CL.recovery modified the lastMigrationId. if it 
did, we need to re apply migrations. this isn't
+        // the same as merely reloading the schema (which wouldn't perform 
file deletion after a DROP). The solution
+        // is to read those migrations from disk and apply them.
+        UUID currentMigration = DatabaseDescriptor.getDefsVersion();
+        UUID lastMigration = Migration.getLastMigrationId();
+        if (lastMigration.timestamp() > currentMigration.timestamp())
+        {
+            MigrationManager.applyMigrations(currentMigration, lastMigration);
+        }
 
         // start server internals
         StorageService.instance.initServer();

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=947163&r1=947162&r2=947163&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java 
Fri May 21 20:29:54 2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db.migration;
 
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.ColumnFamily;
@@ -98,11 +99,11 @@ public abstract class Migration
     public void beforeApplyModels() {}
     
     /** apply changes */
-    public final void apply() throws IOException
+    public final void apply() throws IOException, ConfigurationException
     {
         // ensure migration is serial. don't apply unless the previous version 
matches.
         if (!DatabaseDescriptor.getDefsVersion().equals(lastVersion))
-            throw new IOException("Previous version mismatch. cannot apply.");
+            throw new ConfigurationException("Previous version mismatch. 
cannot apply.");
         // write to schema
         assert rm != null;
         if (!clientMode)
@@ -124,6 +125,10 @@ public abstract class Migration
             migration = new RowMutation(Table.SYSTEM_TABLE, 
LAST_MIGRATION_KEY);
             migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY), 
UUIDGen.decompose(newVersion), now);
             migration.apply();
+            
+            // if we fail here, there will be schema changes in the CL that 
will get replayed *AFTER* the schema is loaded.
+            // CassandraDaemon checks for this condition (the stored version 
will be greater than the loaded version)
+            // and calls MigrationManager.applyMigrations(loaded version, 
stored version).
         
             // flush changes out of memtables so we don't need to rely on the 
commit log.
             ColumnFamilyStore[] schemaStores = new ColumnFamilyStore[] {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=947163&r1=947162&r2=947163&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java 
Fri May 21 20:29:54 2010
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service;
 
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
@@ -42,8 +43,11 @@ import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 public class MigrationManager implements IEndpointStateChangeSubscriber
 {
@@ -99,6 +103,59 @@ public class MigrationManager implements
         for (InetAddress host : hosts)
             MessagingService.instance.sendOneWay(msg, host);
     }
+
+    /**
+     * gets called during startup if we notice a mismatch between the current 
migration version and the one saved. This
+     * can only happen as a result of the commit log recovering schema 
updates, which overwrites lastVersionId.
+     * 
+     * This method silently eats IOExceptions thrown by Migration.apply() as a 
result of applying a migration out of
+     * order.
+     */
+    public static void applyMigrations(UUID from, UUID to) throws IOException
+    {
+        List<Future> updates = new ArrayList<Future>();
+        Collection<IColumn> migrations = Migration.getLocalMigrations(from, 
to);
+        for (IColumn col : migrations)
+        {
+            final Migration migration = Migration.deserialize(new 
ByteArrayInputStream(col.value()));
+            Future update = 
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Runnable() 
+            {
+                public void run()
+                {
+                    try
+                    {
+                        migration.apply();
+                    }
+                    catch (ConfigurationException ex)
+                    {
+                        // this happens if we try to apply something that's 
already been applied. ignore and proceed.
+                    }
+                    catch (IOException ex)
+                    {
+                        throw new RuntimeException(ex);
+                    }
+                }
+            });
+            updates.add(update);
+        }
+        
+        // wait on all the updates before proceeding.
+        for (Future f : updates)
+        {
+            try
+            {
+                f.get();
+            }
+            catch (InterruptedException e)
+            {
+                throw new IOException(e);
+            }
+            catch (ExecutionException e)
+            {
+                throw new IOException(e);
+            }
+        }
+    }
     
     /** pushes migrations from this host to another host */
     public static void pushMigrations(UUID from, UUID to, InetAddress host)


Reply via email to