Serialize ClusteringPrefix in microbatches, using vint encoding

patch by benedict; reviewed by sylvain for CASSANDRA-9708


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5786b320
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5786b320
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5786b320

Branch: refs/heads/trunk
Commit: 5786b3204d6da352124338c0130451e27dd056b0
Parents: c4c9eae
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Wed Jun 17 09:58:41 2015 +0100
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Thu Jul 23 23:26:24 2015 +0100

----------------------------------------------------------------------
 .../apache/cassandra/db/ClusteringPrefix.java   | 129 +++++++++----------
 src/java/org/apache/cassandra/db/TypeSizes.java |   5 +
 .../cassandra/db/rows/UnfilteredSerializer.java |   2 +-
 .../cassandra/cql3/SerializationMirrorTest.java |  63 +++++++++
 4 files changed, 133 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5786b320/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java 
b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 7b9d582..713ad1b 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -286,109 +286,103 @@ public interface ClusteringPrefix extends 
IMeasurableMemory, Clusterable
 
         void serializeValuesWithoutSize(ClusteringPrefix clustering, 
DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
         {
-            if (clustering.size() == 0)
-                return;
-
-            writeHeader(clustering, out);
-            for (int i = 0; i < clustering.size(); i++)
+            int offset = 0;
+            int clusteringSize = clustering.size();
+            // serialize in batches of 32, to avoid garbage when deserializing 
headers
+            while (offset < clusteringSize)
             {
-                ByteBuffer v = clustering.get(i);
-                if (v == null || !v.hasRemaining())
-                    continue; // handled in the header
-
-                types.get(i).writeValue(v, out);
+                // we micro-batch the headers, so that we can incur fewer 
method calls,
+                // and generate no garbage on deserialization;
+                // we piggyback on vint encoding so that, typically, only 1 
byte is used per 32 clustering values,
+                // i.e. more than we ever expect to see
+                int limit = Math.min(clusteringSize, offset + 32);
+                out.writeUnsignedVInt(makeHeader(clustering, offset, limit));
+                while (offset < limit)
+                {
+                    ByteBuffer v = clustering.get(offset);
+                    if (v != null && v.hasRemaining())
+                        types.get(offset).writeValue(v, out);
+                    offset++;
+                }
             }
         }
 
         long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int 
version, List<AbstractType<?>> types)
         {
-            if (clustering.size() == 0)
-                return 0;
-
-            long size = headerBytesCount(clustering.size());
-            for (int i = 0; i < clustering.size(); i++)
+            long result = 0;
+            int offset = 0;
+            int clusteringSize = clustering.size();
+            while (offset < clusteringSize)
+            {
+                int limit = Math.min(clusteringSize, offset + 32);
+                result += TypeSizes.sizeofUnsignedVInt(makeHeader(clustering, 
offset, limit));
+                offset = limit;
+            }
+            for (int i = 0; i < clusteringSize; i++)
             {
                 ByteBuffer v = clustering.get(i);
                 if (v == null || !v.hasRemaining())
                     continue; // handled in the header
 
-                size += types.get(i).writtenLength(v);
+                result += types.get(i).writtenLength(v);
             }
-            return size;
+            return result;
         }
 
         ByteBuffer[] deserializeValuesWithoutSize(DataInputPlus in, int size, 
int version, List<AbstractType<?>> types) throws IOException
         {
             // Callers of this method should handle the case where size = 0 
(in all case we want to return a special value anyway).
             assert size > 0;
-
             ByteBuffer[] values = new ByteBuffer[size];
-            int[] header = readHeader(size, in);
-            for (int i = 0; i < size; i++)
+            int offset = 0;
+            while (offset < size)
             {
-                values[i] = isNull(header, i)
-                          ? null
-                          : (isEmpty(header, i) ? 
ByteBufferUtil.EMPTY_BYTE_BUFFER : types.get(i).readValue(in));
+                long header = in.readUnsignedVInt();
+                int limit = Math.min(size, offset + 32);
+                while (offset < limit)
+                {
+                    values[offset] = isNull(header, offset)
+                                ? null
+                                : (isEmpty(header, offset) ? 
ByteBufferUtil.EMPTY_BYTE_BUFFER : types.get(offset).readValue(in));
+                    offset++;
+                }
             }
             return values;
         }
 
-        private int headerBytesCount(int size)
-        {
-            // For each component, we store 2 bit to know if the component is 
empty or null (or neither).
-            // We thus handle 4 component per byte
-            return size / 4 + (size % 4 == 0 ? 0 : 1);
-        }
-
         /**
          * Whatever the type of a given clustering column is, its value can 
always be either empty or null. So we at least need to distinguish those
          * 2 values, and because we want to be able to store fixed width 
values without appending their (fixed) size first, we need a way to encode
          * empty values too. So for that, every clustering prefix includes a 
"header" that contains 2 bits per element in the prefix. For each element,
          * those 2 bits encode whether the element is null, empty, or none of 
those.
          */
-        private void writeHeader(ClusteringPrefix clustering, DataOutputPlus 
out) throws IOException
+        private static long makeHeader(ClusteringPrefix clustering, int 
offset, int limit)
         {
-            int nbBytes = headerBytesCount(clustering.size());
-            for (int i = 0; i < nbBytes; i++)
+            long header = 0;
+            for (int i = offset ; i < limit ; i++)
             {
-                int b = 0;
-                for (int j = 0; j < 4; j++)
-                {
-                    int c = i * 4 + j;
-                    if (c >= clustering.size())
-                        break;
-
-                    ByteBuffer v = clustering.get(c);
-                    if (v == null)
-                        b |= (1 << (j * 2) + 1);
-                    else if (!v.hasRemaining())
-                        b |= (1 << (j * 2));
-                }
-                out.writeByte((byte)b);
+                ByteBuffer v = clustering.get(i);
+                // no need to do modulo arithmetic for i, since the left-shift 
execute on the modulus of RH operand by definition
+                if (v == null)
+                    header |= (1L << (i * 2) + 1);
+                else if (!v.hasRemaining())
+                    header |= (1L << (i * 2));
             }
-        }
-
-        private int[] readHeader(int size, DataInputPlus in) throws IOException
-        {
-            int nbBytes = headerBytesCount(size);
-            int[] header = new int[nbBytes];
-            for (int i = 0; i < nbBytes; i++)
-                header[i] = in.readUnsignedByte();
             return header;
         }
 
-        private static boolean isNull(int[] header, int i)
+        // no need to do modulo arithmetic for i, since the left-shift execute 
on the modulus of RH operand by definition
+        private static boolean isNull(long header, int i)
         {
-            int b = header[i / 4];
-            int mask = 1 << ((i % 4) * 2) + 1;
-            return (b & mask) != 0;
+            long mask = 1L << (i * 2) + 1;
+            return (header & mask) != 0;
         }
 
-        private static boolean isEmpty(int[] header, int i)
+        // no need to do modulo arithmetic for i, since the left-shift execute 
on the modulus of RH operand by definition
+        private static boolean isEmpty(long header, int i)
         {
-            int b = header[i / 4];
-            int mask = 1 << ((i % 4) * 2);
-            return (b & mask) != 0;
+            long mask = 1L << (i * 2);
+            return (header & mask) != 0;
         }
     }
 
@@ -408,7 +402,7 @@ public interface ClusteringPrefix extends 
IMeasurableMemory, Clusterable
         private final SerializationHeader serializationHeader;
 
         private boolean nextIsRow;
-        private int[] nextHeader;
+        private long nextHeader;
 
         private int nextSize;
         private ClusteringPrefix.Kind nextKind;
@@ -428,7 +422,6 @@ public interface ClusteringPrefix extends 
IMeasurableMemory, Clusterable
             this.nextIsRow = UnfilteredSerializer.kind(flags) == 
Unfiltered.Kind.ROW;
             this.nextKind = nextIsRow ? Kind.CLUSTERING : 
ClusteringPrefix.Kind.values()[in.readByte()];
             this.nextSize = nextIsRow ? comparator.size() : 
in.readUnsignedShort();
-            this.nextHeader = serializer.readHeader(nextSize, in);
             this.deserializedSize = 0;
 
             // The point of the deserializer is that some of the clustering 
prefix won't actually be used (because they are not
@@ -478,6 +471,9 @@ public interface ClusteringPrefix extends 
IMeasurableMemory, Clusterable
             if (deserializedSize == nextSize)
                 return false;
 
+            if ((deserializedSize % 32) == 0)
+                nextHeader = in.readUnsignedVInt();
+
             int i = deserializedSize++;
             nextValues[i] = Serializer.isNull(nextHeader, i)
                           ? null
@@ -513,9 +509,12 @@ public interface ClusteringPrefix extends 
IMeasurableMemory, Clusterable
         {
             for (int i = deserializedSize; i < nextSize; i++)
             {
+                if ((i % 32) == 0)
+                    nextHeader = in.readUnsignedVInt();
                 if (!Serializer.isNull(nextHeader, i) && 
!Serializer.isEmpty(nextHeader, i))
                     serializationHeader.clusteringTypes().get(i).skipValue(in);
             }
+            deserializedSize = nextSize;
             return nextKind;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5786b320/src/java/org/apache/cassandra/db/TypeSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java 
b/src/java/org/apache/cassandra/db/TypeSizes.java
index 73766c8..7e5bd87 100644
--- a/src/java/org/apache/cassandra/db/TypeSizes.java
+++ b/src/java/org/apache/cassandra/db/TypeSizes.java
@@ -102,4 +102,9 @@ public final class TypeSizes
     {
         return VIntCoding.computeVIntSize(value);
     }
+
+    public static int sizeofUnsignedVInt(long value)
+    {
+        return VIntCoding.computeUnsignedVIntSize(value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5786b320/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 4072f8d..11fa800 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -21,8 +21,8 @@ import java.io.IOException;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.SearchIterator;
 
 /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5786b320/test/unit/org/apache/cassandra/cql3/SerializationMirrorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/SerializationMirrorTest.java 
b/test/unit/org/apache/cassandra/cql3/SerializationMirrorTest.java
new file mode 100644
index 0000000..49f77a7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/SerializationMirrorTest.java
@@ -0,0 +1,63 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.cql3;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class SerializationMirrorTest extends CQLTester
+{
+
+    @Test
+    public void testManyClusterings() throws Throwable
+    {
+        StringBuilder table = new StringBuilder("CREATE TABLE %s (a TEXT");
+        StringBuilder cols = new StringBuilder();
+        StringBuilder args = new StringBuilder("?");
+        List<Object> vals = new ArrayList<>();
+        vals.add("a");
+        for (int i = 0 ; i < 40 ; i++)
+        {
+            table.append(", c").append(i).append(" text");
+            cols.append(", c").append(i);
+            if (ThreadLocalRandom.current().nextBoolean())
+                vals.add(Integer.toString(i));
+            else
+                vals.add("");
+            args.append(",?");
+        }
+        args.append(",?");
+        vals.add("value");
+        table.append(", v text, PRIMARY KEY ((a)").append(cols).append("))");
+        createTable(table.toString());
+
+        execute("INSERT INTO %s (a" + cols + ", v) VALUES (" + args+ ")", 
vals.toArray());
+        flush();
+        UntypedResultSet.Row row = execute("SELECT * FROM %s").one();
+        for (int i = 0 ; i < row.getColumns().size() ; i++)
+            Assert.assertEquals(vals.get(i), row.getString(i == 0 ? "a" : i < 
41 ? "c" + (i - 1) : "v"));
+    }
+
+}

Reply via email to