Serialize batchlog mutations with the version of the target node

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-6931


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b7ac8f96
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b7ac8f96
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b7ac8f96

Branch: refs/heads/trunk
Commit: b7ac8f96c169a3cfe18dd50ca1f27ce2b21fd78b
Parents: f6daf4e
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Mon Mar 31 13:09:36 2014 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Mon Mar 31 13:09:36 2014 +0300

----------------------------------------------------------------------
 NEWS.txt                                        |  2 +-
 .../apache/cassandra/db/BatchlogManager.java    | 28 ++++-------
 .../apache/cassandra/net/MessagingService.java  |  6 +--
 .../apache/cassandra/service/StorageProxy.java  | 51 ++++++++++++--------
 .../cassandra/db/BatchlogManagerTest.java       |  7 ++-
 5 files changed, 50 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7ac8f96/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 23f6522..b53795e 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -33,7 +33,7 @@ New features
 
 Upgrading
 ---------
-   - Rolling upgrades from anything pre-2.0.6 is not supported.
+   - Rolling upgrade from anything pre-2.0.7 is not supported.
    - For leveled compaction users, 2.0 must be atleast started before
      upgrading to 2.1 due to the fact that the old JSON leveled
      manifest is migrated into the sstable metadata files on startup

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7ac8f96/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java 
b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 603b9d8..47eb77a 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -44,9 +44,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
@@ -121,26 +119,23 @@ public class BatchlogManager implements 
BatchlogManagerMBean
         batchlogTasks.execute(runnable);
     }
 
-    public static Mutation getBatchlogMutationFor(Collection<Mutation> 
mutations, UUID uuid)
+    public static Mutation getBatchlogMutationFor(Collection<Mutation> 
mutations, UUID uuid, int version)
     {
-        return getBatchlogMutationFor(mutations, uuid, 
FBUtilities.timestampMicros());
+        return getBatchlogMutationFor(mutations, uuid, version, 
FBUtilities.timestampMicros());
     }
 
     @VisibleForTesting
-    static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, 
UUID uuid, long now)
+    static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, 
UUID uuid, int version, long now)
     {
-        ByteBuffer writtenAt = LongType.instance.decompose(now / 1000);
-        ByteBuffer data = serializeMutations(mutations);
-
         ColumnFamily cf = 
ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf);
-        cf.addColumn(new Cell(cellName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, 
now));
-        cf.addColumn(new Cell(cellName("data"), data, now));
-        cf.addColumn(new Cell(cellName("written_at"), writtenAt, now));
-
+        CFRowAdder adder = new CFRowAdder(cf, 
CFMetaData.BatchlogCf.comparator.builder().build(), now);
+        adder.add("data", serializeMutations(mutations, version))
+             .add("written_at", new Date(now / 1000))
+             .add("version", version);
         return new Mutation(Keyspace.SYSTEM_KS, 
UUIDType.instance.decompose(uuid), cf);
     }
 
-    private static ByteBuffer serializeMutations(Collection<Mutation> 
mutations)
+    private static ByteBuffer serializeMutations(Collection<Mutation> 
mutations, int version)
     {
         DataOutputBuffer buf = new DataOutputBuffer();
 
@@ -148,7 +143,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         {
             buf.writeInt(mutations.size());
             for (Mutation mutation : mutations)
-                Mutation.serializer.serialize(mutation, buf, 
MessagingService.VERSION_12);
+                Mutation.serializer.serialize(mutation, buf, version);
         }
         catch (IOException e)
         {
@@ -331,11 +326,6 @@ public class BatchlogManager implements 
BatchlogManagerMBean
         return (int) ((HintedHandOffManager.calculateHintTTL(mutation) * 1000 
- (System.currentTimeMillis() - writtenAt)) / 1000);
     }
 
-    private static CellName cellName(String name)
-    {
-        return CFMetaData.BatchlogCf.comparator.makeCellName(name);
-    }
-
     // force flush + compaction to reclaim space from the replayed batches
     private void cleanup() throws ExecutionException, InterruptedException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7ac8f96/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 939cf8e..664fa30 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -70,9 +70,9 @@ public final class MessagingService implements 
MessagingServiceMBean
     public static final String MBEAN_NAME = 
"org.apache.cassandra.net:type=MessagingService";
 
     // 8 bits version, so don't waste versions
-    public static final int VERSION_12  = 6;
-    public static final int VERSION_20  = 7;
-    public static final int VERSION_21  = 8;
+    public static final int VERSION_12 = 6;
+    public static final int VERSION_20 = 7;
+    public static final int VERSION_21 = 8;
     public static final int current_version = VERSION_21;
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7ac8f96/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index f31e092..932ae25 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -607,40 +607,53 @@ public class StorageProxy implements StorageProxyMBean
                                                                         
Keyspace.open(Keyspace.SYSTEM_KS),
                                                                         null,
                                                                         
WriteType.BATCH_LOG);
-        updateBatchlog(BatchlogManager.getBatchlogMutationFor(mutations, 
uuid), endpoints, handler);
+
+        MessageOut<Mutation> message = 
BatchlogManager.getBatchlogMutationFor(mutations, uuid, 
MessagingService.current_version)
+                                                      .createMessage();
+        for (InetAddress target : endpoints)
+        {
+            int targetVersion = MessagingService.instance().getVersion(target);
+            if (target.equals(FBUtilities.getBroadcastAddress()) && 
OPTIMIZE_LOCAL_REQUESTS)
+            {
+                insertLocal(message.payload, handler);
+            }
+            else if (targetVersion == MessagingService.current_version)
+            {
+                MessagingService.instance().sendRR(message, target, handler);
+            }
+            else
+            {
+                
MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations,
 uuid, targetVersion)
+                                                                  
.createMessage(),
+                                                   target,
+                                                   handler);
+            }
+        }
+
         handler.get();
     }
 
     private static void asyncRemoveFromBatchlog(Collection<InetAddress> 
endpoints, UUID uuid)
     {
-        ColumnFamily cf = 
ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS,
 SystemKeyspace.BATCHLOG_CF));
-        cf.delete(new DeletionInfo(FBUtilities.timestampMicros(), (int) 
(System.currentTimeMillis() / 1000)));
         AbstractWriteResponseHandler handler = new 
WriteResponseHandler(endpoints,
                                                                         
Collections.<InetAddress>emptyList(),
                                                                         
ConsistencyLevel.ANY,
                                                                         
Keyspace.open(Keyspace.SYSTEM_KS),
                                                                         null,
                                                                         
WriteType.SIMPLE);
-        updateBatchlog(new Mutation(Keyspace.SYSTEM_KS, 
UUIDType.instance.decompose(uuid), cf), endpoints, handler);
-    }
-
-    private static void updateBatchlog(Mutation mutation, 
Collection<InetAddress> endpoints, AbstractWriteResponseHandler handler)
-    {
-        if (endpoints.contains(FBUtilities.getBroadcastAddress()))
-        {
-            assert endpoints.size() == 1;
-            insertLocal(mutation, handler);
-        }
-        else
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, 
UUIDType.instance.decompose(uuid));
+        mutation.delete(SystemKeyspace.BATCHLOG_CF, 
FBUtilities.timestampMicros());
+        MessageOut<Mutation> message = mutation.createMessage();
+        for (InetAddress target : endpoints)
         {
-            MessageOut<Mutation> message = mutation.createMessage();
-            for (InetAddress target : endpoints)
+            if (target.equals(FBUtilities.getBroadcastAddress()) && 
OPTIMIZE_LOCAL_REQUESTS)
+                insertLocal(message.payload, handler);
+            else
                 MessagingService.instance().sendRR(message, target, handler);
         }
     }
 
-    private static void 
syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers,
-                                                  String localDataCenter)
+    private static void 
syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String 
localDataCenter)
     throws WriteTimeoutException, OverloadedException
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
@@ -650,9 +663,7 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         for (WriteResponseHandlerWrapper wrapper : wrappers)
-        {
             wrapper.handler.get();
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7ac8f96/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java 
b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 5450fe5..42cd196 100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -66,7 +67,11 @@ public class BatchlogManagerTest extends SchemaLoader
             long timestamp = System.currentTimeMillis();
             if (i < 500)
                 timestamp -= DatabaseDescriptor.getWriteRpcTimeout() * 2;
-            
BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation), 
UUIDGen.getTimeUUID(), timestamp * 1000).apply();
+            
BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation),
+                                                   UUIDGen.getTimeUUID(),
+                                                   
MessagingService.current_version,
+                                                   timestamp * 1000)
+                           .apply();
         }
 
         assertEquals(1000, BatchlogManager.instance.countAllBatches());

Reply via email to