This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new c365b005e2 Introduce MutationJournal for coordinator logs
c365b005e2 is described below
commit c365b005e2be7b1a36809f0c932cc9c520f6ab97
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Thu Jan 9 14:14:44 2025 +0000
Introduce MutationJournal for coordinator logs
patch by Aleksey Yeschenko; reviewed by Blake Eggleston for
CASSANDRA-20353
---
.../apache/cassandra/journal/ActiveSegment.java | 5 +
src/java/org/apache/cassandra/journal/Flusher.java | 15 +-
src/java/org/apache/cassandra/journal/Journal.java | 5 +-
src/java/org/apache/cassandra/journal/Segment.java | 4 +-
.../metrics/CassandraMetricsRegistry.java | 1 +
.../cassandra/service/tracking/MutationId.java | 84 ++++++++
.../service/tracking/MutationJournal.java | 235 +++++++++++++++++++++
.../service/tracking/MutationJournalTest.java | 154 ++++++++++++++
8 files changed, 493 insertions(+), 10 deletions(-)
diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index af02885547..30e4e8f27d 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -497,6 +497,11 @@ final class ActiveSegment<K, V> extends Segment<K, V>
{
return start;
}
+
+ RecordPointer recordPointer()
+ {
+ return new RecordPointer(descriptor.timestamp, start);
+ }
}
private int maybeCompleteInProgress()
diff --git a/src/java/org/apache/cassandra/journal/Flusher.java
b/src/java/org/apache/cassandra/journal/Flusher.java
index 2aba68d8ea..4648ebcb61 100644
--- a/src/java/org/apache/cassandra/journal/Flusher.java
+++ b/src/java/org/apache/cassandra/journal/Flusher.java
@@ -383,7 +383,7 @@ final class Flusher<K, V>
private interface Mode<K, V>
{
- void flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc);
+ RecordPointer flushAndAwaitDurable(ActiveSegment<K, V>.Allocation
alloc);
RecordPointer flushAsync(ActiveSegment<K, V>.Allocation alloc);
boolean isDurable(RecordPointer recordPointer);
}
@@ -391,13 +391,14 @@ final class Flusher<K, V>
private class BatchMode implements Mode<K, V>
{
@Override
- public void flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc)
+ public RecordPointer flushAndAwaitDurable(ActiveSegment<K,
V>.Allocation alloc)
{
pending.incrementAndGet();
requestExtraFlush();
alloc.awaitDurable(journal.metrics.waitingOnFlush);
pending.decrementAndGet();
written.incrementAndGet();
+ return alloc.recordPointer();
}
@Override
@@ -405,7 +406,7 @@ final class Flusher<K, V>
{
requestExtraFlush();
written.incrementAndGet();
- return new RecordPointer(alloc.descriptor().timestamp,
alloc.start());
+ return alloc.recordPointer();
}
@Override
@@ -418,19 +419,20 @@ final class Flusher<K, V>
private class GroupMode implements Mode<K, V>
{
@Override
- public void flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc)
+ public RecordPointer flushAndAwaitDurable(ActiveSegment<K,
V>.Allocation alloc)
{
pending.incrementAndGet();
alloc.awaitDurable(journal.metrics.waitingOnFlush);
pending.decrementAndGet();
written.incrementAndGet();
+ return alloc.recordPointer();
}
@Override
public RecordPointer flushAsync(ActiveSegment<K, V>.Allocation alloc)
{
written.incrementAndGet();
- return new RecordPointer(alloc.descriptor().timestamp,
alloc.start());
+ return alloc.recordPointer();
}
@Override
@@ -443,7 +445,7 @@ final class Flusher<K, V>
private class PeriodicMode implements Mode<K, V>
{
@Override
- public void flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc)
+ public RecordPointer flushAndAwaitDurable(ActiveSegment<K,
V>.Allocation alloc)
{
RecordPointer pointer = flushAsync(alloc);
@@ -454,6 +456,7 @@ final class Flusher<K, V>
awaitFsyncAt(expectedFsyncTime,
journal.metrics.waitingOnFlush.time());
pending.decrementAndGet();
}
+ return pointer;
}
@Override
diff --git a/src/java/org/apache/cassandra/journal/Journal.java
b/src/java/org/apache/cassandra/journal/Journal.java
index bfebb71204..cebfb7a702 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -261,12 +261,12 @@ public class Journal<K, V> implements Shutdownable
segmentPrepared.signalAll(); // Wake up all threads waiting on the
new segment
compactor.shutdown();
compactor.awaitTermination(1, TimeUnit.MINUTES);
+ closeAllSegments();
flusher.shutdown();
closer.shutdown();
releaser.shutdown();
closer.awaitTermination(1, TimeUnit.MINUTES);
releaser.awaitTermination(1, TimeUnit.MINUTES);
- closeAllSegments();
metrics.deregister();
Invariants.checkState(state.compareAndSet(State.SHUTDOWN,
State.TERMINATED),
"Unexpected journal state while trying to
shut down", state);
@@ -446,7 +446,7 @@ public class Journal<K, V> implements Shutdownable
* @param id user-provided record id, expected to roughly correlate with
time and go up
* @param record the record to store
*/
- public void blockingWrite(K id, V record)
+ public RecordPointer blockingWrite(K id, V record)
{
try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
{
@@ -454,6 +454,7 @@ public class Journal<K, V> implements Shutdownable
ActiveSegment<K, V>.Allocation alloc = allocate(dob.getLength());
alloc.writeInternal(id, dob.unsafeGetBufferAndFlip());
flusher.flushAndAwaitDurable(alloc);
+ return alloc.recordPointer();
}
catch (IOException e)
{
diff --git a/src/java/org/apache/cassandra/journal/Segment.java
b/src/java/org/apache/cassandra/journal/Segment.java
index a5053a9629..05ae0ea86c 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -86,7 +86,7 @@ public abstract class Segment<K, V> implements
SelfRefCounted<Segment<K, V>>, Co
int size = Index.readSize(offsetAndSize);
if (read(offset, size, into))
{
- Invariants.checkState(id.equals(into.key), "Index for %s read
incorrect key: expected %s but read %s", descriptor, id, into.key);
+ Invariants.checkState(id.equals(into.key));
consumer.accept(descriptor.timestamp, offset, id, into.value,
descriptor.userVersion);
return true;
}
@@ -98,7 +98,7 @@ public abstract class Segment<K, V> implements
SelfRefCounted<Segment<K, V>>, Co
long offsetAndSize = index().lookUpLast(id);
if (offsetAndSize == -1 || !read(Index.readOffset(offsetAndSize),
Index.readSize(offsetAndSize), into))
return false;
- Invariants.checkState(id.equals(into.key), "Index for %s read
incorrect key: expected %s but read %s", descriptor, id, into.key);
+ Invariants.checkState(id.equals(into.key));
return true;
}
diff --git
a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 8cf83f5208..d901324c03 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -132,6 +132,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
.add(HintsServiceMetrics.TYPE_NAME)
.add(InternodeInboundMetrics.TYPE_NAME)
.add(InternodeOutboundMetrics.TYPE_NAME)
+
.add(org.apache.cassandra.journal.Metrics.TYPE_NAME)
.add(KeyspaceMetrics.TYPE_NAME)
.add(MemtablePool.TYPE_NAME)
.add(MessagingMetrics.TYPE_NAME)
diff --git a/src/java/org/apache/cassandra/service/tracking/MutationId.java
b/src/java/org/apache/cassandra/service/tracking/MutationId.java
new file mode 100644
index 0000000000..9132676530
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/tracking/MutationId.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.tracking;
+
+public class MutationId
+{
+ /**
+ * 4 byte TCM host id + 4 byte host log id packed into a long.
+ * Host log ID is unique within the host, allocated
+ * anew on host restart - one per token range replicated by the host,
+ * persisted on allocation, unique within the host.
+ */
+ public final long logId;
+
+ /**
+ * 4 byte position + 4 byte timestamp packed into a long.
+ * Position is incremented, the timestamp is monotonically non-decreasing.
+ * The position is enough to identify the entry within a coordinator log,
+ * the timestamp is added for correlation purposes.
+ */
+ public final long sequenceId;
+
+ MutationId(long logId, long sequenceId)
+ {
+ this.logId = logId;
+ this.sequenceId = sequenceId;
+ }
+
+ public int hostId()
+ {
+ return (int) (0xffffffffL & (logId >> 32));
+ }
+
+ public int hostLogId()
+ {
+ return (int) (0xffffffffL & logId);
+ }
+
+ public int position()
+ {
+ return (int) (0xffffffffL & (sequenceId >> 32));
+ }
+
+ public int timestamp()
+ {
+ return (int) (0xffffffffL & sequenceId);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof MutationId)) return false;
+ MutationId that = (MutationId) o;
+ return this.logId == that.logId && this.sequenceId == that.sequenceId;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Long.hashCode(logId) + 31 * Long.hashCode(sequenceId);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "MutationId{" + logId + ", " + sequenceId + '}';
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/tracking/MutationJournal.java
b/src/java/org/apache/cassandra/service/tracking/MutationJournal.java
new file mode 100644
index 0000000000..3f00e4387c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/tracking/MutationJournal.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.tracking;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.Checksum;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.Params;
+import org.apache.cassandra.journal.RecordConsumer;
+import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.SegmentCompactor;
+import org.apache.cassandra.journal.ValueSerializer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class MutationJournal
+{
+ private final Journal<MutationId, Mutation> journal;
+
+ private MutationJournal()
+ {
+ this(new File(DatabaseDescriptor.getCommitLogLocation()), new
JournalParams());
+ }
+
+ @VisibleForTesting
+ MutationJournal(File directory, Params params)
+ {
+ journal = new Journal<>("MutationJournal", directory, params, new
MutationIdSupport(), new MutationSerializer(), SegmentCompactor.noop());
+ }
+
+ public void start()
+ {
+ journal.start();
+ }
+
+ public void shutdownBlocking()
+ {
+ journal.shutdown();
+ }
+
+ public RecordPointer write(MutationId id, Mutation mutation)
+ {
+ return journal.blockingWrite(id, mutation);
+ }
+
+ @Nullable
+ public Mutation read(MutationId id)
+ {
+ return journal.readLast(id);
+ }
+
+ public boolean read(MutationId id, RecordConsumer<MutationId> consumer)
+ {
+ return journal.readLast(id, consumer);
+ }
+
+ public void readAll(Iterable<MutationId> ids, Collection<Mutation> into)
+ {
+ for (MutationId id : ids)
+ {
+ Mutation mutation = read(id);
+ Preconditions.checkState(mutation != null);
+ into.add(mutation);
+ }
+ }
+
+ static class JournalParams implements Params
+ {
+ @Override
+ public int segmentSize()
+ {
+ return DatabaseDescriptor.getCommitLogSegmentSize();
+ }
+
+ @Override
+ public FailurePolicy failurePolicy()
+ {
+ return FailurePolicy.STOP;
+ }
+
+ @Override
+ public FlushMode flushMode()
+ {
+ return FlushMode.PERIODIC;
+ }
+
+ @Override
+ public boolean enableCompaction()
+ {
+ return false;
+ }
+
+ @Override
+ public long compactionPeriod(TimeUnit units)
+ {
+ return 0;
+ }
+
+ @Override
+ public long flushPeriod(TimeUnit units)
+ {
+ return units.convert(DatabaseDescriptor.getCommitLogSyncPeriod(),
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public long periodicBlockPeriod(TimeUnit units)
+ {
+ return
units.convert(DatabaseDescriptor.getPeriodicCommitLogSyncBlock(),
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int userVersion()
+ {
+ return MessagingService.current_version;
+ }
+ }
+
+ static class MutationIdSupport implements KeySupport<MutationId>
+ {
+ static final int LOG_ID_OFFSET = 0;
+ static final int SEQUENCE_ID_OFFSET = LOG_ID_OFFSET +
TypeSizes.LONG_SIZE;
+
+ @Override
+ public int serializedSize(int userVersion)
+ {
+ return TypeSizes.LONG_SIZE // logId
+ + TypeSizes.LONG_SIZE; // sequenceId
+ }
+
+ @Override
+ public void serialize(MutationId id, DataOutputPlus out, int
userVersion) throws IOException
+ {
+ out.writeLong(id.logId);
+ out.writeLong(id.sequenceId);
+ }
+
+ @Override
+ public void serialize(MutationId id, ByteBuffer out, int userVersion)
throws IOException
+ {
+ out.putLong(id.logId);
+ out.putLong(id.sequenceId);
+ }
+
+ @Override
+ public MutationId deserialize(DataInputPlus in, int userVersion)
throws IOException
+ {
+ long logId = in.readLong();
+ long sequenceId = in.readLong();
+ return new MutationId(logId, sequenceId);
+ }
+
+ @Override
+ public MutationId deserialize(ByteBuffer buffer, int position, int
userVersion)
+ {
+ long logId = buffer.getLong(position + LOG_ID_OFFSET);
+ long sequenceId = buffer.getLong(position + SEQUENCE_ID_OFFSET);
+ return new MutationId(logId, sequenceId);
+ }
+
+ @Override
+ public MutationId deserialize(ByteBuffer buffer, int userVersion)
+ {
+ long logId = buffer.getLong();
+ long sequenceId = buffer.getLong();
+ return new MutationId(logId, sequenceId);
+ }
+
+ @Override
+ public void updateChecksum(Checksum crc, MutationId id, int
userVersion)
+ {
+ FBUtilities.updateChecksumLong(crc, id.logId);
+ FBUtilities.updateChecksumLong(crc, id.sequenceId);
+ }
+
+ @Override
+ public int compareWithKeyAt(MutationId id, ByteBuffer buffer, int
position, int userVersion)
+ {
+ int cmp = Long.compare(id.logId, buffer.getLong(position +
LOG_ID_OFFSET));
+ return cmp != 0 ? cmp : Long.compare(id.sequenceId,
buffer.getLong(position + SEQUENCE_ID_OFFSET));
+ }
+
+ @Override
+ public int compare(MutationId id1, MutationId id2)
+ {
+ int cmp = Long.compare(id1.logId, id2.logId);
+ return cmp != 0 ? cmp : Long.compare(id1.sequenceId,
id2.sequenceId);
+ }
+ }
+
+ static class MutationSerializer implements ValueSerializer<MutationId,
Mutation>
+ {
+ @Override
+ public void serialize(MutationId id, Mutation mutation, DataOutputPlus
out, int userVersion) throws IOException
+ {
+ Mutation.serializer.serialize(mutation, out, userVersion);
+ }
+
+ @Override
+ public Mutation deserialize(MutationId id, DataInputPlus in, int
userVersion) throws IOException
+ {
+ return Mutation.serializer.deserialize(in, userVersion);
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/service/tracking/MutationJournalTest.java
b/test/unit/org/apache/cassandra/service/tracking/MutationJournalTest.java
new file mode 100644
index 0000000000..55e97f57bd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/tracking/MutationJournalTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.tracking;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.TestParams;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests to sanity-check the integration points with Journal
+ * (mutation id and mutation ser/de, comparison, etc.)
+ */
+public class MutationJournalTest
+{
+ private static final String KEYSPACE = "mjtks";
+ private static final String TABLE = "mjtt";
+
+ private static MutationJournal journal;
+
+ @BeforeClass
+ public static void setUp() throws IOException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(3),
+ TableMetadata.builder(KEYSPACE, TABLE)
+ .addPartitionKeyColumn("pk",
UTF8Type.instance)
+ .addClusteringColumn("ck",
UTF8Type.instance)
+ .addRegularColumn("value",
UTF8Type.instance)
+ .build());
+
+ File directory = new
File(Files.createTempDirectory("mutation-journal-test-simple"));
+ directory.deleteRecursiveOnExit();
+
+ journal = new MutationJournal(directory, TestParams.INSTANCE);
+ journal.start();
+ }
+
+ @AfterClass
+ public static void tearDown()
+ {
+ journal.shutdownBlocking();
+ }
+
+ @Test
+ public void testWriteOneReadOne()
+ {
+ Mutation expected =
+ new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE,
TABLE), 0, "key")
+ .clustering("ck")
+ .add("value", "value")
+ .build();
+
+ MutationId id = new MutationId(100L, 0);
+ journal.write(id, expected);
+
+ // regular read
+ Mutation actual = journal.read(id);
+ assertMutationEquals(expected, actual);
+
+ // read via RecordConsumer
+ journal.read(id, ((segment, position, key, buffer, userVersion) ->
+ {
+ assertEquals(id, key);
+ assertEquals(serialize(expected), buffer);
+ }));
+ }
+
+ @Test
+ public void testWriteManyReadMany()
+ {
+ Mutation expected1 =
+ new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE,
TABLE), 0, "key1")
+ .clustering("ck1")
+ .add("value", "value1")
+ .build();
+ Mutation expected2 =
+ new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE,
TABLE), 0, "key2")
+ .clustering("ck2")
+ .add("value", "value2")
+ .build();
+ List<Mutation> expected = List.of(expected1, expected2);
+
+ MutationId id1 = new MutationId(100L, 1);
+ MutationId id2 = new MutationId(100L, 2);
+ List<MutationId> ids = List.of(id1, id2);
+
+ journal.write(id1, expected1);
+ journal.write(id2, expected2);
+
+ List<Mutation> actual = new ArrayList<>();
+ journal.readAll(ids, actual);
+ assertMutationsEqual(expected, actual);
+ }
+
+ private static void assertMutationEquals(Mutation expected, Mutation
actual)
+ {
+ assertEquals(serialize(expected), serialize(actual));
+ }
+
+ private static void assertMutationsEqual(List<Mutation> expected,
List<Mutation> actual)
+ {
+ assertEquals(expected.size(), actual.size());
+ for (int i = 0; i < expected.size(); i++)
+ assertMutationEquals(expected.get(i), actual.get(i));
+ }
+
+ private static ByteBuffer serialize(Mutation mutation)
+ {
+ try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
+ {
+ Mutation.serializer.serialize(mutation, out,
MessagingService.maximum_version);
+ return out.asNewBuffer();
+ }
+ catch (IOException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]