Serialize batchlog mutations with the version of the target node patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for CASSANDRA-6931
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b7ac8f96 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b7ac8f96 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b7ac8f96 Branch: refs/heads/cassandra-2.1 Commit: b7ac8f96c169a3cfe18dd50ca1f27ce2b21fd78b Parents: f6daf4e Author: Aleksey Yeschenko <alek...@apache.org> Authored: Mon Mar 31 13:09:36 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Mon Mar 31 13:09:36 2014 +0300 ---------------------------------------------------------------------- NEWS.txt | 2 +- .../apache/cassandra/db/BatchlogManager.java | 28 ++++------- .../apache/cassandra/net/MessagingService.java | 6 +-- .../apache/cassandra/service/StorageProxy.java | 51 ++++++++++++-------- .../cassandra/db/BatchlogManagerTest.java | 7 ++- 5 files changed, 50 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7ac8f96/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 23f6522..b53795e 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -33,7 +33,7 @@ New features Upgrading --------- - - Rolling upgrades from anything pre-2.0.6 is not supported. + - Rolling upgrade from anything pre-2.0.7 is not supported. - For leveled compaction users, 2.0 must be atleast started before upgrading to 2.1 due to the fact that the old JSON leveled manifest is migrated into the sstable metadata files on startup http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7ac8f96/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 603b9d8..47eb77a 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -44,9 +44,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.WriteTimeoutException; @@ -121,26 +119,23 @@ public class BatchlogManager implements BatchlogManagerMBean batchlogTasks.execute(runnable); } - public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid) + public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version) { - return getBatchlogMutationFor(mutations, uuid, FBUtilities.timestampMicros()); + return getBatchlogMutationFor(mutations, uuid, version, FBUtilities.timestampMicros()); } @VisibleForTesting - static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, long now) + static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version, long now) { - ByteBuffer writtenAt = LongType.instance.decompose(now / 1000); - ByteBuffer data = serializeMutations(mutations); - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf); - cf.addColumn(new Cell(cellName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, now)); - cf.addColumn(new Cell(cellName("data"), data, now)); - cf.addColumn(new Cell(cellName("written_at"), writtenAt, now)); - + CFRowAdder adder = new CFRowAdder(cf, CFMetaData.BatchlogCf.comparator.builder().build(), now); + adder.add("data", serializeMutations(mutations, version)) + .add("written_at", new Date(now / 1000)) + .add("version", version); return new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf); } - private static ByteBuffer serializeMutations(Collection<Mutation> mutations) + private static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version) { DataOutputBuffer buf = new DataOutputBuffer(); @@ -148,7 +143,7 @@ public class BatchlogManager implements BatchlogManagerMBean { buf.writeInt(mutations.size()); for (Mutation mutation : mutations) - Mutation.serializer.serialize(mutation, buf, MessagingService.VERSION_12); + Mutation.serializer.serialize(mutation, buf, version); } catch (IOException e) { @@ -331,11 +326,6 @@ public class BatchlogManager implements BatchlogManagerMBean return (int) ((HintedHandOffManager.calculateHintTTL(mutation) * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000); } - private static CellName cellName(String name) - { - return CFMetaData.BatchlogCf.comparator.makeCellName(name); - } - // force flush + compaction to reclaim space from the replayed batches private void cleanup() throws ExecutionException, InterruptedException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7ac8f96/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 939cf8e..664fa30 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -70,9 +70,9 @@ public final class MessagingService implements MessagingServiceMBean public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService"; // 8 bits version, so don't waste versions - public static final int VERSION_12 = 6; - public static final int VERSION_20 = 7; - public static final int VERSION_21 = 8; + public static final int VERSION_12 = 6; + public static final int VERSION_20 = 7; + public static final int VERSION_21 = 8; public static final int current_version = VERSION_21; /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7ac8f96/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index f31e092..932ae25 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -607,40 +607,53 @@ public class StorageProxy implements StorageProxyMBean Keyspace.open(Keyspace.SYSTEM_KS), null, WriteType.BATCH_LOG); - updateBatchlog(BatchlogManager.getBatchlogMutationFor(mutations, uuid), endpoints, handler); + + MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version) + .createMessage(); + for (InetAddress target : endpoints) + { + int targetVersion = MessagingService.instance().getVersion(target); + if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) + { + insertLocal(message.payload, handler); + } + else if (targetVersion == MessagingService.current_version) + { + MessagingService.instance().sendRR(message, target, handler); + } + else + { + MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion) + .createMessage(), + target, + handler); + } + } + handler.get(); } private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid) { - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF)); - cf.delete(new DeletionInfo(FBUtilities.timestampMicros(), (int) (System.currentTimeMillis() / 1000))); AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, Collections.<InetAddress>emptyList(), ConsistencyLevel.ANY, Keyspace.open(Keyspace.SYSTEM_KS), null, WriteType.SIMPLE); - updateBatchlog(new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf), endpoints, handler); - } - - private static void updateBatchlog(Mutation mutation, Collection<InetAddress> endpoints, AbstractWriteResponseHandler handler) - { - if (endpoints.contains(FBUtilities.getBroadcastAddress())) - { - assert endpoints.size() == 1; - insertLocal(mutation, handler); - } - else + Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid)); + mutation.delete(SystemKeyspace.BATCHLOG_CF, FBUtilities.timestampMicros()); + MessageOut<Mutation> message = mutation.createMessage(); + for (InetAddress target : endpoints) { - MessageOut<Mutation> message = mutation.createMessage(); - for (InetAddress target : endpoints) + if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) + insertLocal(message.payload, handler); + else MessagingService.instance().sendRR(message, target, handler); } } - private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, - String localDataCenter) + private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter) throws WriteTimeoutException, OverloadedException { for (WriteResponseHandlerWrapper wrapper : wrappers) @@ -650,9 +663,7 @@ public class StorageProxy implements StorageProxyMBean } for (WriteResponseHandlerWrapper wrapper : wrappers) - { wrapper.handler.get(); - } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7ac8f96/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java index 5450fe5..42cd196 100644 --- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java +++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java @@ -30,6 +30,7 @@ import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.UUIDGen; @@ -66,7 +67,11 @@ public class BatchlogManagerTest extends SchemaLoader long timestamp = System.currentTimeMillis(); if (i < 500) timestamp -= DatabaseDescriptor.getWriteRpcTimeout() * 2; - BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(), timestamp * 1000).apply(); + BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation), + UUIDGen.getTimeUUID(), + MessagingService.current_version, + timestamp * 1000) + .apply(); } assertEquals(1000, BatchlogManager.instance.countAllBatches());