Author: jbellis
Date: Thu May 26 20:47:16 2011
New Revision: 1128074

URL: http://svn.apache.org/viewvc?rev=1128074&view=rev
Log:
throttle migration replay
patch by jbellis; reviewed by gdusbabek for CASSANDRA-2714

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1128074&r1=1128073&r2=1128074&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu May 26 20:47:16 2011
@@ -10,6 +10,7 @@
  * remove no-op HHOM.renameHints (CASSANDRA-2693)
  * clone super columns to avoid modifying them during flush (CASSANDRA-2675)
  * close scrub file handles (CASSANDRA-2669)
+ * throttle migration replay (CASSANDRA-2714)
 
 
 0.7.6

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1128074&r1=1128073&r2=1128074&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
 Thu May 26 20:47:16 2011
@@ -298,7 +298,12 @@ public abstract class Migration
         DecoratedKey dkey = 
StorageService.getPartitioner().decorateKey(MIGRATIONS_KEY);
         Table defs = Table.open(Table.SYSTEM_TABLE);
         ColumnFamilyStore cfStore = 
defs.getColumnFamilyStore(Migration.MIGRATIONS_CF);
-        QueryFilter filter = QueryFilter.getSliceFilter(dkey, new 
QueryPath(MIGRATIONS_CF), ByteBuffer.wrap(UUIDGen.decompose(start)), 
ByteBuffer.wrap(UUIDGen.decompose(end)), false, 1000);   
+        QueryFilter filter = QueryFilter.getSliceFilter(dkey,
+                                                        new 
QueryPath(MIGRATIONS_CF),
+                                                        
ByteBuffer.wrap(UUIDGen.decompose(start)),
+                                                        
ByteBuffer.wrap(UUIDGen.decompose(end)),
+                                                        false,
+                                                        100);
         ColumnFamily cf = cfStore.getColumnFamily(filter);
         return cf.getSortedColumns();
     }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1128074&r1=1128073&r2=1128074&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
 Thu May 26 20:47:16 2011
@@ -24,8 +24,10 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.utils.ByteBufferUtil;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MapMaker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,16 +37,21 @@ import org.apache.cassandra.config.Confi
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class MigrationManager implements IEndpointStateChangeSubscriber
 {
     private static final Logger logger = 
LoggerFactory.getLogger(MigrationManager.class);
-    
+
+    // avoids re-pushing migrations that we're waiting on target to apply 
already
+    private static Map<InetAddress,UUID> lastPushed = new 
MapMaker().expiration(1, TimeUnit.MINUTES).makeMap();
+
     /** I'm not going to act here. */
     public void onJoin(InetAddress endpoint, EndpointState epState) { }
 
@@ -87,8 +94,16 @@ public class MigrationManager implements
         }
         else if (!StorageService.instance.isClientMode())
         {
-            logger.debug("Their data definitions are old. Sending updates 
since {}", theirVersion.toString());
-            pushMigrations(theirVersion, myVersion, endpoint);
+            if (lastPushed.get(endpoint) == null || theirVersion.timestamp() 
>= lastPushed.get(endpoint).timestamp())
+            {
+                logger.debug("Schema on {} is old. Sending updates since {}", 
endpoint, theirVersion);
+                pushMigrations(theirVersion, myVersion, endpoint);
+            }
+            else
+            {
+                logger.debug("Waiting for {} to process migrations up to {} 
before sending more",
+                             endpoint, lastPushed.get(endpoint));
+            }
         }
     }
 
@@ -172,6 +187,7 @@ public class MigrationManager implements
         {
             Message msg = makeMigrationMessage(migrations);
             MessagingService.instance().sendOneWay(msg, host);
+            lastPushed.put(host, 
TimeUUIDType.instance.compose(Iterables.getLast(migrations).name()));
         }
         catch (IOException ex)
         {


Reply via email to