Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 6874aaa0d -> d049017ac


2.0 compatibility modifications for CASSANDRA-6931

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-6931


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d049017a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d049017a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d049017a

Branch: refs/heads/cassandra-2.0
Commit: d049017ac85ce22e7dcf87879e94b386987b19e6
Parents: 6874aaa
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Mon Mar 31 12:53:24 2014 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Mon Mar 31 12:53:24 2014 +0300

----------------------------------------------------------------------
 .../org/apache/cassandra/config/CFMetaData.java |  3 ++-
 .../apache/cassandra/db/BatchlogManager.java    | 22 ++++++++++----------
 2 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d049017a/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java 
b/src/java/org/apache/cassandra/config/CFMetaData.java
index ff40e65..1f25cea 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -233,7 +233,8 @@ public final class CFMetaData
     public static final CFMetaData BatchlogCf = compile("CREATE TABLE " + 
SystemKeyspace.BATCHLOG_CF + " ("
                                                         + "id uuid PRIMARY 
KEY,"
                                                         + "written_at 
timestamp,"
-                                                        + "data blob"
+                                                        + "data blob,"
+                                                        + "version int,"
                                                         + ") WITH 
COMMENT='uncommited batches' AND gc_grace_seconds=0 "
                                                         + "AND 
COMPACTION={'class' : 'SizeTieredCompactionStrategy', 'min_threshold' : 2}");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d049017a/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java 
b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 23cacca..2e09285 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -66,7 +66,6 @@ import org.apache.cassandra.utils.WrappedRunnable;
 public class BatchlogManager implements BatchlogManagerMBean
 {
     private static final String MBEAN_NAME = 
"org.apache.cassandra.db:type=BatchlogManager";
-    private static final int VERSION = MessagingService.VERSION_12;
     private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds
     private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out 
using any heuristics. TODO: set based on avg batch size.
 
@@ -151,7 +150,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         {
             out.writeInt(mutations.size());
             for (RowMutation rm : mutations)
-                RowMutation.serializer.serialize(rm, out, VERSION);
+                RowMutation.serializer.serialize(rm, out, 
MessagingService.VERSION_12);
         }
         catch (IOException e)
         {
@@ -176,7 +175,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         try
         {
-            UntypedResultSet page = process("SELECT id, data, written_at FROM 
%s.%s LIMIT %d",
+            UntypedResultSet page = process("SELECT id, data, written_at, 
version FROM %s.%s LIMIT %d",
                                             Keyspace.SYSTEM_KS,
                                             SystemKeyspace.BATCHLOG_CF,
                                             PAGE_SIZE);
@@ -188,7 +187,7 @@ public class BatchlogManager implements BatchlogManagerMBean
                 if (page.size() < PAGE_SIZE)
                     break; // we've exhausted the batchlog, next query would 
be empty.
 
-                page = process("SELECT id, data, written_at FROM %s.%s WHERE 
token(id) > token(%s) LIMIT %d",
+                page = process("SELECT id, data, written_at, version FROM 
%s.%s WHERE token(id) > token(%s) LIMIT %d",
                                Keyspace.SYSTEM_KS,
                                SystemKeyspace.BATCHLOG_CF,
                                id,
@@ -213,22 +212,23 @@ public class BatchlogManager implements 
BatchlogManagerMBean
         {
             id = row.getUUID("id");
             long writtenAt = row.getLong("written_at");
+            int version = row.has("version") ? row.getInt("version") : 
MessagingService.VERSION_12;
             // enough time for the actual write + batchlog entry mutation 
delivery (two separate requests).
             long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // 
enough time for the actual write + BM removal mutation
             if (System.currentTimeMillis() < writtenAt + timeout)
                 continue; // not ready to replay yet, might still get a 
deletion.
-            replayBatch(id, row.getBytes("data"), writtenAt, rateLimiter);
+            replayBatch(id, row.getBytes("data"), writtenAt, version, 
rateLimiter);
         }
         return id;
     }
 
-    private void replayBatch(UUID id, ByteBuffer data, long writtenAt, 
RateLimiter rateLimiter)
+    private void replayBatch(UUID id, ByteBuffer data, long writtenAt, int 
version, RateLimiter rateLimiter)
     {
         logger.debug("Replaying batch {}", id);
 
         try
         {
-            replaySerializedMutations(data, writtenAt, rateLimiter);
+            replaySerializedMutations(data, writtenAt, version, rateLimiter);
         }
         catch (IOException e)
         {
@@ -247,19 +247,19 @@ public class BatchlogManager implements 
BatchlogManagerMBean
         mutation.apply();
     }
 
-    private void replaySerializedMutations(ByteBuffer data, long writtenAt, 
RateLimiter rateLimiter) throws IOException
+    private void replaySerializedMutations(ByteBuffer data, long writtenAt, 
int version, RateLimiter rateLimiter) throws IOException
     {
         DataInputStream in = new 
DataInputStream(ByteBufferUtil.inputStream(data));
         int size = in.readInt();
         for (int i = 0; i < size; i++)
-            replaySerializedMutation(RowMutation.serializer.deserialize(in, 
VERSION), writtenAt, rateLimiter);
+            replaySerializedMutation(RowMutation.serializer.deserialize(in, 
version), writtenAt, version, rateLimiter);
     }
 
     /*
      * We try to deliver the mutations to the replicas ourselves if they are 
alive and only resort to writing hints
      * when a replica is down or a write request times out.
      */
-    private void replaySerializedMutation(RowMutation mutation, long 
writtenAt, RateLimiter rateLimiter)
+    private void replaySerializedMutation(RowMutation mutation, long 
writtenAt, int version, RateLimiter rateLimiter)
     {
         int ttl = calculateHintTTL(mutation, writtenAt);
         if (ttl <= 0)
@@ -268,7 +268,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         Set<InetAddress> liveEndpoints = new HashSet<>();
         String ks = mutation.getKeyspaceName();
         Token<?> tk = StorageService.getPartitioner().getToken(mutation.key());
-        int mutationSize = (int) 
RowMutation.serializer.serializedSize(mutation, VERSION);
+        int mutationSize = (int) 
RowMutation.serializer.serializedSize(mutation, version);
 
         for (InetAddress endpoint : 
Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
                                                      
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))

Reply via email to