Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 6874aaa0d -> d049017ac
2.0 compatibility modifications for CASSANDRA-6931 patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for CASSANDRA-6931 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d049017a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d049017a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d049017a Branch: refs/heads/cassandra-2.0 Commit: d049017ac85ce22e7dcf87879e94b386987b19e6 Parents: 6874aaa Author: Aleksey Yeschenko <alek...@apache.org> Authored: Mon Mar 31 12:53:24 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Mon Mar 31 12:53:24 2014 +0300 ---------------------------------------------------------------------- .../org/apache/cassandra/config/CFMetaData.java | 3 ++- .../apache/cassandra/db/BatchlogManager.java | 22 ++++++++++---------- 2 files changed, 13 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d049017a/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index ff40e65..1f25cea 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -233,7 +233,8 @@ public final class CFMetaData public static final CFMetaData BatchlogCf = compile("CREATE TABLE " + SystemKeyspace.BATCHLOG_CF + " (" + "id uuid PRIMARY KEY," + "written_at timestamp," - + "data blob" + + "data blob," + + "version int," + ") WITH COMMENT='uncommited batches' AND gc_grace_seconds=0 " + "AND COMPACTION={'class' : 'SizeTieredCompactionStrategy', 'min_threshold' : 2}"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d049017a/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 23cacca..2e09285 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -66,7 +66,6 @@ import org.apache.cassandra.utils.WrappedRunnable; public class BatchlogManager implements BatchlogManagerMBean { private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager"; - private static final int VERSION = MessagingService.VERSION_12; private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size. @@ -151,7 +150,7 @@ public class BatchlogManager implements BatchlogManagerMBean { out.writeInt(mutations.size()); for (RowMutation rm : mutations) - RowMutation.serializer.serialize(rm, out, VERSION); + RowMutation.serializer.serialize(rm, out, MessagingService.VERSION_12); } catch (IOException e) { @@ -176,7 +175,7 @@ public class BatchlogManager implements BatchlogManagerMBean try { - UntypedResultSet page = process("SELECT id, data, written_at FROM %s.%s LIMIT %d", + UntypedResultSet page = process("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF, PAGE_SIZE); @@ -188,7 +187,7 @@ public class BatchlogManager implements BatchlogManagerMBean if (page.size() < PAGE_SIZE) break; // we've exhausted the batchlog, next query would be empty. - page = process("SELECT id, data, written_at FROM %s.%s WHERE token(id) > token(%s) LIMIT %d", + page = process("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(%s) LIMIT %d", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF, id, @@ -213,22 +212,23 @@ public class BatchlogManager implements BatchlogManagerMBean { id = row.getUUID("id"); long writtenAt = row.getLong("written_at"); + int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12; // enough time for the actual write + batchlog entry mutation delivery (two separate requests). long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation if (System.currentTimeMillis() < writtenAt + timeout) continue; // not ready to replay yet, might still get a deletion. - replayBatch(id, row.getBytes("data"), writtenAt, rateLimiter); + replayBatch(id, row.getBytes("data"), writtenAt, version, rateLimiter); } return id; } - private void replayBatch(UUID id, ByteBuffer data, long writtenAt, RateLimiter rateLimiter) + private void replayBatch(UUID id, ByteBuffer data, long writtenAt, int version, RateLimiter rateLimiter) { logger.debug("Replaying batch {}", id); try { - replaySerializedMutations(data, writtenAt, rateLimiter); + replaySerializedMutations(data, writtenAt, version, rateLimiter); } catch (IOException e) { @@ -247,19 +247,19 @@ public class BatchlogManager implements BatchlogManagerMBean mutation.apply(); } - private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter rateLimiter) throws IOException + private void replaySerializedMutations(ByteBuffer data, long writtenAt, int version, RateLimiter rateLimiter) throws IOException { DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data)); int size = in.readInt(); for (int i = 0; i < size; i++) - replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt, rateLimiter); + replaySerializedMutation(RowMutation.serializer.deserialize(in, version), writtenAt, version, rateLimiter); } /* * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints * when a replica is down or a write request times out. */ - private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter) + private void replaySerializedMutation(RowMutation mutation, long writtenAt, int version, RateLimiter rateLimiter) { int ttl = calculateHintTTL(mutation, writtenAt); if (ttl <= 0) @@ -268,7 +268,7 @@ public class BatchlogManager implements BatchlogManagerMBean Set<InetAddress> liveEndpoints = new HashSet<>(); String ks = mutation.getKeyspaceName(); Token<?> tk = StorageService.getPartitioner().getToken(mutation.key()); - int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION); + int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, version); for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))