This is an automated email from the ASF dual-hosted git repository.

jake pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 227409d920 Add Mutation Serialization Caching
227409d920 is described below

commit 227409d9201fa1aeb9f80b22f499577aedfe25bc
Author: Jake Luciani <j...@datastax.com>
AuthorDate: Mon Dec 19 16:05:12 2022 -0500

    Add Mutation Serialization Caching
    
    Patch by T Jake Luciani; Reviewed by Josh McKenzie for CASSANDRA-17998
    
    Co-authored-by: T Jake Luciani (j...@apache.org)
    Co-authored-by: Mike Adamson (mikeat...@gmail.com)
    Co-authored-by: Robert Stupp (sn...@apache.org)
---
 CHANGES.txt                                        |   1 +
 .../config/CassandraRelevantProperties.java        |   3 +
 src/java/org/apache/cassandra/db/Mutation.java     | 202 ++++++++++++++++--
 .../apache/cassandra/io/util/TeeDataInputPlus.java | 225 +++++++++++++++++++++
 .../org/apache/cassandra/net/MessagingService.java |  20 ++
 .../org/apache/cassandra/service/StorageProxy.java |   9 +
 .../cassandra/utils/TeeDataInputPlusTest.java      | 141 +++++++++++++
 7 files changed, 580 insertions(+), 21 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 621d84a20d..11de4309e8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Add Mutation Serialization Caching (CASSANDRA-17998)
  * Only reload compaction strategies if disk boundaries change 
(CASSANDRA-17874)
  * CEP-10: Simulator Java11 Support (CASSANDRA-17178)
  * Set the major compaction type correctly for compactionstats 
(CASSANDRA-18055)
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 8ab3cb7e48..41de975475 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -238,6 +238,9 @@ public enum CassandraRelevantProperties
     SYSTEM_TRACES_DEFAULT_RF("cassandra.system_traces.default_rf", "2"),
     SYSTEM_DISTRIBUTED_DEFAULT_RF("cassandra.system_distributed.default_rf", 
"3"),
 
+    /** Represents the maximum size (in bytes) of a serialized mutation that 
can be cached **/
+    
CACHEABLE_MUTATION_SIZE_LIMIT("cassandra.cacheable_mutation_size_limit_bytes", 
Long.toString(1_000_000)),
+
     MEMTABLE_OVERHEAD_SIZE("cassandra.memtable.row_overhead_size", "-1"),
     
MEMTABLE_OVERHEAD_COMPUTE_STEPS("cassandra.memtable_row_overhead_computation_step",
 "100000"),
     MEMTABLE_TRIE_SIZE_LIMIT("cassandra.trie_size_limit_mb"),
diff --git a/src/java/org/apache/cassandra/db/Mutation.java 
b/src/java/org/apache/cassandra/db/Mutation.java
index 7b6a68670f..b9ac32498f 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -23,20 +23,28 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableCollection;
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.DeserializationHelper;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.TeeDataInputPlus;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.concurrent.Future;
 
@@ -64,6 +72,15 @@ public class Mutation implements IMutation, 
Supplier<Mutation>
 
     private final boolean cdcEnabled;
 
+    private static final int SERIALIZATION_VERSION_COUNT = 
MessagingService.Version.values().length;
+    // Contains serialized representations of this mutation.
+    // Note: there is no functionality to clear/remove serialized instances, 
because a mutation must never
+    // be modified (e.g. calling add(PartitionUpdate)) when it's being 
serialized.
+    private final Serialization[] cachedSerializations = new 
Serialization[SERIALIZATION_VERSION_COUNT];
+
+    /** @see CassandraRelevantProperties#CACHEABLE_MUTATION_SIZE_LIMIT */
+    private static final long CACHEABLE_MUTATION_SIZE_LIMIT = 
CassandraRelevantProperties.CACHEABLE_MUTATION_SIZE_LIMIT.getLong();
+
     public Mutation(PartitionUpdate update)
     {
         this(update.metadata().keyspace, update.partitionKey(), 
ImmutableMap.of(update.metadata().id, update), approxTime.now(), 
update.metadata().params.cdc);
@@ -298,6 +315,7 @@ public class Mutation implements IMutation, 
Supplier<Mutation>
         }
         return buff.append("])").toString();
     }
+
     private int serializedSize30;
     private int serializedSize3014;
     private int serializedSize40;
@@ -393,34 +411,115 @@ public class Mutation implements IMutation, 
Supplier<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int 
version) throws IOException
         {
+            serialization(mutation, 
version).serialize(PartitionUpdate.serializer, mutation, out, version);
+        }
+
+        /**
+         * Called early during request processing to prevent that {@link 
#serialization(Mutation, int)} is
+         * called concurrently.
+         * See {@link 
org.apache.cassandra.service.StorageProxy#sendToHintedReplicas(Mutation, 
ReplicaPlan.ForWrite, AbstractWriteResponseHandler, String, Stage)}
+         */
+        @SuppressWarnings("JavadocReference")
+        public void prepareSerializedBuffer(Mutation mutation, int version)
+        {
+            serialization(mutation, version);
+        }
+
+        /**
+         * Retrieve the cached serialization of this mutation, or compute and 
cache said serialization if it doesn't
+         * exist yet. Note that this method is _not_ synchronized even though 
it may (and will often) be called
+         * concurrently. Concurrent calls are still safe however, the only 
risk is that the value is not cached yet,
+         * multiple concurrent calls may compute it multiple times instead of 
just once. This is ok as in practice
+         * as we make sure this doesn't happen in the hot path by forcing the 
initial caching in
+         * {@link 
org.apache.cassandra.service.StorageProxy#sendToHintedReplicas(Mutation, 
ReplicaPlan.ForWrite, AbstractWriteResponseHandler, String, Stage)}
+         * via {@link #prepareSerializedBuffer(Mutation)}, which is the only 
caller that passes
+         * {@code isPrepare==true}.
+         */
+        @SuppressWarnings("JavadocReference")
+        private Serialization serialization(Mutation mutation, int version)
+        {
+            int versionOrdinal = MessagingService.getVersionOrdinal(version);
+            // Retrieves the cached version, or build+cache it if it's not 
cached already.
+            Serialization serialization = 
mutation.cachedSerializations[versionOrdinal];
+            if (serialization == null)
+            {
+                serialization = new SizeOnlyCacheableSerialization();
+                long serializedSize = 
serialization.serializedSize(PartitionUpdate.serializer, mutation, version);
+
+                // Excessively large mutation objects cause GC pressure and 
huge allocations when serialized.
+                // so we only cache serialized mutations when they are below 
the defined limit.
+                if (serializedSize < CACHEABLE_MUTATION_SIZE_LIMIT)
+                {
+                    try (DataOutputBuffer dob = 
DataOutputBuffer.scratchBuffer.get())
+                    {
+                        serializeInternal(PartitionUpdate.serializer, 
mutation, dob, version);
+                        serialization = new 
CachedSerialization(dob.toByteArray());
+                    }
+                    catch (IOException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+                mutation.cachedSerializations[versionOrdinal] = serialization;
+            }
+
+            return serialization;
+        }
+
+        static void 
serializeInternal(PartitionUpdate.PartitionUpdateSerializer serializer,
+                                         Mutation mutation,
+                                         DataOutputPlus out,
+                                         int version) throws IOException
+        {
+            Map<TableId, PartitionUpdate> modifications = 
mutation.modifications;
+
             /* serialize the modifications in the mutation */
-            int size = mutation.modifications.size();
+            int size = modifications.size();
             out.writeUnsignedVInt(size);
 
             assert size > 0;
-            for (Map.Entry<TableId, PartitionUpdate> entry : 
mutation.modifications.entrySet())
-                PartitionUpdate.serializer.serialize(entry.getValue(), out, 
version);
+            for (PartitionUpdate partitionUpdate : modifications.values())
+            {
+                serializer.serialize(partitionUpdate, out, version);
+            }
         }
 
         public Mutation deserialize(DataInputPlus in, int version, 
DeserializationHelper.Flag flag) throws IOException
         {
-            int size = (int)in.readUnsignedVInt();
-            assert size > 0;
-
-            PartitionUpdate update = 
PartitionUpdate.serializer.deserialize(in, version, flag);
-            if (size == 1)
-                return new Mutation(update);
-
-            ImmutableMap.Builder<TableId, PartitionUpdate> modifications = new 
ImmutableMap.Builder<>();
-            DecoratedKey dk = update.partitionKey();
-
-            modifications.put(update.metadata().id, update);
-            for (int i = 1; i < size; ++i)
+            Mutation m;
+            TeeDataInputPlus teeIn;
+            try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
             {
-                update = PartitionUpdate.serializer.deserialize(in, version, 
flag);
-                modifications.put(update.metadata().id, update);
+                teeIn = new TeeDataInputPlus(in, dob, 
CACHEABLE_MUTATION_SIZE_LIMIT);
+
+                int size = (int) teeIn.readUnsignedVInt();
+                assert size > 0;
+
+                PartitionUpdate update = 
PartitionUpdate.serializer.deserialize(teeIn, version, flag);
+                if (size == 1)
+                {
+                    m = new Mutation(update);
+                }
+                else
+                {
+                    ImmutableMap.Builder<TableId, PartitionUpdate> 
modifications = new ImmutableMap.Builder<>();
+                    DecoratedKey dk = update.partitionKey();
+
+                    modifications.put(update.metadata().id, update);
+                    for (int i = 1; i < size; ++i)
+                    {
+                        update = PartitionUpdate.serializer.deserialize(teeIn, 
version, flag);
+                        modifications.put(update.metadata().id, update);
+                    }
+                    m = new Mutation(update.metadata().keyspace, dk, 
modifications.build(), approxTime.now());
+                }
+
+                //Only cache serializations that don't hit the limit
+                if (!teeIn.isLimitReached())
+                    
m.cachedSerializations[MessagingService.getVersionOrdinal(version)] = new 
CachedSerialization(dob.toByteArray());
+
+                return m;
             }
-            return new Mutation(update.metadata().keyspace, dk, 
modifications.build(), approxTime.now());
         }
 
         public Mutation deserialize(DataInputPlus in, int version) throws 
IOException
@@ -430,10 +529,71 @@ public class Mutation implements IMutation, 
Supplier<Mutation>
 
         public long serializedSize(Mutation mutation, int version)
         {
-            int size = 
TypeSizes.sizeofUnsignedVInt(mutation.modifications.size());
-            for (Map.Entry<TableId, PartitionUpdate> entry : 
mutation.modifications.entrySet())
-                size += 
PartitionUpdate.serializer.serializedSize(entry.getValue(), version);
+            return serialization(mutation, 
version).serializedSize(PartitionUpdate.serializer, mutation, version);
+        }
+    }
+
+    /**
+     * There are two implementations of this class. One that keeps the 
serialized representation on-heap for later
+     * reuse and one that doesn't. Keeping all sized mutations around may lead 
to "bad" GC pressure (G1 GC) due to humongous objects.
+     * By default serialized mutations up to 2MB are kept on-heap - see {@link 
org.apache.cassandra.config.CassandraRelevantProperties#CACHEABLE_MUTATION_SIZE_LIMIT}.
+     */
+    private static abstract class Serialization
+    {
+        abstract void serialize(PartitionUpdate.PartitionUpdateSerializer 
serializer, Mutation mutation, DataOutputPlus out, int version) throws 
IOException;
+
+        abstract long serializedSize(PartitionUpdate.PartitionUpdateSerializer 
serializer, Mutation mutation, int version);
+    }
+
+    /**
+     * Represents the cached serialization of a {@link Mutation} as a {@code 
byte[]}.
+     */
+    private static final class CachedSerialization extends Serialization
+    {
+        private final byte[] serialized;
+
+        CachedSerialization(byte[] serialized)
+        {
+            this.serialized = Preconditions.checkNotNull(serialized);
+        }
+
+        @Override
+        void serialize(PartitionUpdate.PartitionUpdateSerializer serializer, 
Mutation mutation, DataOutputPlus out, int version) throws IOException
+        {
+            out.write(serialized);
+        }
 
+        @Override
+        long serializedSize(PartitionUpdate.PartitionUpdateSerializer 
serializer, Mutation mutation, int version)
+        {
+            return serialized.length;
+        }
+    }
+
+    /**
+     * Represents a non-cacheable serialization of a {@link Mutation}, only 
the size of the mutation is lazily cached.
+     */
+    private static final class SizeOnlyCacheableSerialization extends 
Serialization
+    {
+        private volatile long size;
+
+        @Override
+        void serialize(PartitionUpdate.PartitionUpdateSerializer serializer, 
Mutation mutation, DataOutputPlus out, int version) throws IOException
+        {
+            MutationSerializer.serializeInternal(serializer, mutation, out, 
version);
+        }
+
+        @Override
+        long serializedSize(PartitionUpdate.PartitionUpdateSerializer 
serializer, Mutation mutation, int version)
+        {
+            long size = this.size;
+            if (size == 0L)
+            {
+                size = 
TypeSizes.sizeofUnsignedVInt(mutation.modifications.size());
+                for (PartitionUpdate partitionUpdate : 
mutation.modifications.values())
+                    size += serializer.serializedSize(partitionUpdate, 
version);
+                this.size = size;
+            }
             return size;
         }
     }
diff --git a/src/java/org/apache/cassandra/io/util/TeeDataInputPlus.java 
b/src/java/org/apache/cassandra/io/util/TeeDataInputPlus.java
new file mode 100644
index 0000000000..b16c1cc5f4
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/TeeDataInputPlus.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * DataInput that also stores the raw inputs into an output buffer
+ * This is useful for storing serialized buffers as they are deserialized.
+ *
+ * Note: If a non-zero limit is included it is important to for callers to 
check {@link #isLimitReached()}
+ * before using the tee buffer as it could be cropped.
+ */
+public class TeeDataInputPlus implements DataInputPlus
+{
+    private final DataInputPlus source;
+    private final DataOutputPlus teeBuffer;
+
+    private final long limit;
+    private boolean limitReached;
+
+    public TeeDataInputPlus(DataInputPlus source, DataOutputPlus teeBuffer)
+    {
+        this(source, teeBuffer, 0);
+    }
+
+    public TeeDataInputPlus(DataInputPlus source, DataOutputPlus teeBuffer, 
long limit)
+    {
+        assert source != null && teeBuffer != null;
+        this.source = source;
+        this.teeBuffer = teeBuffer;
+        this.limit = limit;
+        this.limitReached = false;
+    }
+
+    private void maybeWrite(int length, Throwables.DiscreteAction<IOException> 
writeAction) throws IOException
+    {
+        if (limit <= 0 || (!limitReached && (teeBuffer.position() + length) < 
limit))
+            writeAction.perform();
+        else
+            limitReached = true;
+    }
+
+    @Override
+    public void readFully(byte[] bytes) throws IOException
+    {
+        source.readFully(bytes);
+        maybeWrite(bytes.length, () -> teeBuffer.write(bytes));
+    }
+
+    @Override
+    public void readFully(byte[] bytes, int offset, int length) throws 
IOException
+    {
+        source.readFully(bytes, offset, length);
+        maybeWrite(length, () -> teeBuffer.write(bytes, offset, length));
+    }
+
+    @Override
+    public int skipBytes(int n) throws IOException
+    {
+        for (int i = 0; i < n; i++)
+        {
+            try
+            {
+                byte v = source.readByte();
+                maybeWrite(TypeSizes.BYTE_SIZE, () -> teeBuffer.writeByte(v));
+            }
+            catch (EOFException eof)
+            {
+                return i;
+            }
+        }
+        return n;
+    }
+
+    @Override
+    public boolean readBoolean() throws IOException
+    {
+        boolean v = source.readBoolean();
+        maybeWrite(TypeSizes.BOOL_SIZE, () -> teeBuffer.writeBoolean(v));
+        return v;
+    }
+
+    @Override
+    public byte readByte() throws IOException
+    {
+        byte v = source.readByte();
+        maybeWrite(TypeSizes.BYTE_SIZE, () -> teeBuffer.writeByte(v));
+        return v;
+    }
+
+    @Override
+    public int readUnsignedByte() throws IOException
+    {
+        int v = source.readUnsignedByte();
+        maybeWrite(TypeSizes.BYTE_SIZE, () -> teeBuffer.writeByte(v));
+        return v;
+    }
+
+    @Override
+    public short readShort() throws IOException
+    {
+        short v = source.readShort();
+        maybeWrite(TypeSizes.SHORT_SIZE, () -> teeBuffer.writeShort(v));
+        return v;
+    }
+
+    @Override
+    public int readUnsignedShort() throws IOException
+    {
+        int v = source.readUnsignedShort();
+        maybeWrite(TypeSizes.SHORT_SIZE, () -> teeBuffer.writeShort(v));
+        return v;
+    }
+
+    @Override
+    public char readChar() throws IOException
+    {
+        char v = source.readChar();
+        maybeWrite(TypeSizes.BYTE_SIZE, () -> teeBuffer.writeChar(v));
+        return v;
+    }
+
+    @Override
+    public int readInt() throws IOException
+    {
+        int v = source.readInt();
+        maybeWrite(TypeSizes.INT_SIZE, () -> teeBuffer.writeInt(v));
+        return v;
+    }
+
+    @Override
+    public long readLong() throws IOException
+    {
+        long v = source.readLong();
+        maybeWrite(TypeSizes.LONG_SIZE, () -> teeBuffer.writeLong(v));
+        return v;
+    }
+
+    @Override
+    public float readFloat() throws IOException
+    {
+        float v = source.readFloat();
+        maybeWrite(TypeSizes.FLOAT_SIZE, () -> teeBuffer.writeFloat(v));
+        return v;
+    }
+
+    @Override
+    public double readDouble() throws IOException
+    {
+        double v = source.readDouble();
+        maybeWrite(TypeSizes.DOUBLE_SIZE, () -> teeBuffer.writeDouble(v));
+        return v;
+    }
+
+    @Override
+    public String readLine() throws IOException
+    {
+        //This one isn't safe since we know the actual line termination type
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String readUTF() throws IOException
+    {
+        String v = source.readUTF();
+        maybeWrite(TypeSizes.sizeof(v), () -> teeBuffer.writeUTF(v));
+        return v;
+    }
+
+    @Override
+    public long readVInt() throws IOException
+    {
+        long v = source.readVInt();
+        maybeWrite(TypeSizes.sizeofVInt(v), () -> teeBuffer.writeVInt(v));
+        return v;
+    }
+
+    @Override
+    public long readUnsignedVInt() throws IOException
+    {
+        long v = source.readUnsignedVInt();
+        maybeWrite(TypeSizes.sizeofUnsignedVInt(v), () -> 
teeBuffer.writeUnsignedVInt(v));
+        return v;
+    }
+
+    @Override
+    public void skipBytesFully(int n) throws IOException
+    {
+        source.skipBytesFully(n);
+        maybeWrite(n, () -> {
+            for (int i = 0; i < n; i++)
+                teeBuffer.writeByte(0);
+        });
+    }
+
+    /**
+     * Used to detect if the teeBuffer hit the supplied limit.
+     * If true this means the teeBuffer does not contain the full input.
+     */
+    public boolean isLimitReached()
+    {
+        return limitReached;
+    }
+}
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index d968a0ce2b..650bc03e9e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -20,10 +20,13 @@ package org.apache.cassandra.net;
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -212,6 +215,23 @@ public class MessagingService extends 
MessagingServiceMBeanImpl
     public static final int current_version = VERSION_40;
     static AcceptVersions accept_messaging = new 
AcceptVersions(minimum_version, current_version);
     static AcceptVersions accept_streaming = new 
AcceptVersions(current_version, current_version);
+    static Map<Integer, Integer> versionOrdinalMap = 
Arrays.stream(Version.values()).collect(Collectors.toMap(v -> v.value, v -> 
v.ordinal()));
+
+    /**
+     * This is an optimisation to speed up the translation of the serialization
+     * version to the {@link Version} enum ordinal.
+     *
+     * @param version the serialization version
+     * @return a {@link Version} ordinal value
+     */
+    public static int getVersionOrdinal(int version)
+    {
+        Integer ordinal = versionOrdinalMap.get(version);
+        if (ordinal == null)
+            throw new IllegalStateException("Unkown serialization version: " + 
version);
+
+        return ordinal;
+    }
 
     public enum Version
     {
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 3c20aad9ce..52039c1d9f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1475,6 +1475,15 @@ public class StorageProxy implements StorageProxyMBean
 
         List<InetAddressAndPort> backPressureHosts = null;
 
+        // For performance, Mutation caches serialized buffers that are 
computed lazily in serializedBuffer(). That
+        // computation is not synchronized however and we will potentially 
call that method concurrently for each
+        // dispatched message (not that concurrent calls to serializedBuffer() 
are "unsafe" per se, just that they
+        // may result in multiple computations, making the caching 
optimization moot). So forcing the serialization
+        // here to make sure it's already cached/computed when it's 
concurrently used later.
+        // Side note: we have one cached buffers for each used EncodingVersion 
and this only pre-compute the one for
+        // the current version, but it's just an optimization and we're ok not 
optimizing for mixed-version clusters.
+        Mutation.serializer.prepareSerializedBuffer(mutation, 
MessagingService.current_version);
+
         for (Replica destination : plan.contacts())
         {
             checkHintOverload(destination);
diff --git a/test/unit/org/apache/cassandra/utils/TeeDataInputPlusTest.java 
b/test/unit/org/apache/cassandra/utils/TeeDataInputPlusTest.java
new file mode 100644
index 0000000000..baac9ca73d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/TeeDataInputPlusTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.TeeDataInputPlus;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TeeDataInputPlusTest
+{
+    @Test
+    public void testTeeBuffer() throws Exception
+    {
+        DataOutputBuffer out = new DataOutputBuffer();
+        byte[] testData;
+
+        // boolean
+        out.writeBoolean(true);
+        // byte
+        out.writeByte(0x1);
+        // char
+        out.writeChar('a');
+        // short
+        out.writeShort(1);
+        // int
+        out.writeInt(1);
+        // long
+        out.writeLong(1L);
+        // float
+        out.writeFloat(1.0f);
+        // double
+        out.writeDouble(1.0d);
+        // vint
+        out.writeVInt(-1337L);
+        //unsigned vint
+        out.writeUnsignedVInt(1337L);
+        // String
+        out.writeUTF("abc");
+        //Another string to test skip
+        out.writeUTF("garbagetoskipattheend");
+        testData = out.toByteArray();
+
+        int LIMITED_SIZE = 40;
+        DataInputBuffer reader = new DataInputBuffer(testData);
+        DataInputBuffer reader2 = new DataInputBuffer(testData);
+        DataOutputBuffer teeOut = new DataOutputBuffer();
+        DataOutputBuffer limitedTeeOut = new DataOutputBuffer();
+        TeeDataInputPlus tee = new TeeDataInputPlus(reader, teeOut);
+        TeeDataInputPlus limitedTee = new TeeDataInputPlus(reader2, 
limitedTeeOut, LIMITED_SIZE);
+
+        // boolean = 1byte
+        boolean bool = tee.readBoolean();
+        assertTrue(bool);
+        bool = limitedTee.readBoolean();
+        assertTrue(bool);
+        // byte = 1byte
+        byte b = tee.readByte();
+        assertEquals(b, 0x1);
+        b = limitedTee.readByte();
+        assertEquals(b, 0x1);
+        // char = 2byte
+        char c = tee.readChar();
+        assertEquals('a', c);
+        c = limitedTee.readChar();
+        assertEquals('a', c);
+        // short = 2bytes
+        short s = tee.readShort();
+        assertEquals(1, s);
+        s = limitedTee.readShort();
+        assertEquals(1, s);
+        // int = 4bytes
+        int i = tee.readInt();
+        assertEquals(1, i);
+        i = limitedTee.readInt();
+        assertEquals(1, i);
+        // long = 8bytes
+        long l = tee.readLong();
+        assertEquals(1L, l);
+        l = limitedTee.readLong();
+        assertEquals(1L, l);
+        // float = 4bytes
+        float f = tee.readFloat();
+        assertEquals(1.0f, f, 0);
+        f = limitedTee.readFloat();
+        assertEquals(1.0f, f, 0);
+        // double = 8bytes
+        double d = tee.readDouble();
+        assertEquals(1.0d, d, 0);
+        d = limitedTee.readDouble();
+        assertEquals(1.0d, d, 0);
+        long vint = tee.readVInt();
+        assertEquals(-1337L, vint);
+        vint = limitedTee.readVInt();
+        assertEquals(-1337L, vint);
+        long uvint = tee.readUnsignedVInt();
+        assertEquals(1337L, uvint);
+        uvint = limitedTee.readUnsignedVInt();
+        assertEquals(1337L, uvint);
+        // String("abc") = 2(string size) + 3 = 5 bytes
+        String str = tee.readUTF();
+        assertEquals("abc", str);
+        str = limitedTee.readUTF();
+        assertEquals("abc", str);
+        int skipped = tee.skipBytes(100);
+        assertEquals(23, skipped);
+        skipped = limitedTee.skipBytes(100);
+        assertEquals(23, skipped);
+
+        byte[] teeData = teeOut.toByteArray();
+        assertFalse(tee.isLimitReached());
+        assertTrue(Arrays.equals(testData, teeData));
+
+        byte[] limitedTeeData = limitedTeeOut.toByteArray();
+        assertTrue(limitedTee.isLimitReached());
+        assertTrue(Arrays.equals(Arrays.copyOf(testData, LIMITED_SIZE - 1 ), 
limitedTeeData));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to