Serialize ClusteringPrefix in microbatches, using vint encoding patch by benedict; reviewed by sylvain for CASSANDRA-9708
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5786b320 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5786b320 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5786b320 Branch: refs/heads/trunk Commit: 5786b3204d6da352124338c0130451e27dd056b0 Parents: c4c9eae Author: Benedict Elliott Smith <bened...@apache.org> Authored: Wed Jun 17 09:58:41 2015 +0100 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Thu Jul 23 23:26:24 2015 +0100 ---------------------------------------------------------------------- .../apache/cassandra/db/ClusteringPrefix.java | 129 +++++++++---------- src/java/org/apache/cassandra/db/TypeSizes.java | 5 + .../cassandra/db/rows/UnfilteredSerializer.java | 2 +- .../cassandra/cql3/SerializationMirrorTest.java | 63 +++++++++ 4 files changed, 133 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5786b320/src/java/org/apache/cassandra/db/ClusteringPrefix.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java index 7b9d582..713ad1b 100644 --- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java +++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java @@ -286,109 +286,103 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable void serializeValuesWithoutSize(ClusteringPrefix clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException { - if (clustering.size() == 0) - return; - - writeHeader(clustering, out); - for (int i = 0; i < clustering.size(); i++) + int offset = 0; + int clusteringSize = clustering.size(); + // serialize in batches of 32, to avoid garbage when deserializing headers + while (offset < clusteringSize) { - ByteBuffer v = clustering.get(i); - if (v == null || !v.hasRemaining()) - continue; // handled in the header - - types.get(i).writeValue(v, out); + // we micro-batch the headers, so that we can incur fewer method calls, + // and generate no garbage on deserialization; + // we piggyback on vint encoding so that, typically, only 1 byte is used per 32 clustering values, + // i.e. more than we ever expect to see + int limit = Math.min(clusteringSize, offset + 32); + out.writeUnsignedVInt(makeHeader(clustering, offset, limit)); + while (offset < limit) + { + ByteBuffer v = clustering.get(offset); + if (v != null && v.hasRemaining()) + types.get(offset).writeValue(v, out); + offset++; + } } } long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int version, List<AbstractType<?>> types) { - if (clustering.size() == 0) - return 0; - - long size = headerBytesCount(clustering.size()); - for (int i = 0; i < clustering.size(); i++) + long result = 0; + int offset = 0; + int clusteringSize = clustering.size(); + while (offset < clusteringSize) + { + int limit = Math.min(clusteringSize, offset + 32); + result += TypeSizes.sizeofUnsignedVInt(makeHeader(clustering, offset, limit)); + offset = limit; + } + for (int i = 0; i < clusteringSize; i++) { ByteBuffer v = clustering.get(i); if (v == null || !v.hasRemaining()) continue; // handled in the header - size += types.get(i).writtenLength(v); + result += types.get(i).writtenLength(v); } - return size; + return result; } ByteBuffer[] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException { // Callers of this method should handle the case where size = 0 (in all case we want to return a special value anyway). assert size > 0; - ByteBuffer[] values = new ByteBuffer[size]; - int[] header = readHeader(size, in); - for (int i = 0; i < size; i++) + int offset = 0; + while (offset < size) { - values[i] = isNull(header, i) - ? null - : (isEmpty(header, i) ? ByteBufferUtil.EMPTY_BYTE_BUFFER : types.get(i).readValue(in)); + long header = in.readUnsignedVInt(); + int limit = Math.min(size, offset + 32); + while (offset < limit) + { + values[offset] = isNull(header, offset) + ? null + : (isEmpty(header, offset) ? ByteBufferUtil.EMPTY_BYTE_BUFFER : types.get(offset).readValue(in)); + offset++; + } } return values; } - private int headerBytesCount(int size) - { - // For each component, we store 2 bit to know if the component is empty or null (or neither). - // We thus handle 4 component per byte - return size / 4 + (size % 4 == 0 ? 0 : 1); - } - /** * Whatever the type of a given clustering column is, its value can always be either empty or null. So we at least need to distinguish those * 2 values, and because we want to be able to store fixed width values without appending their (fixed) size first, we need a way to encode * empty values too. So for that, every clustering prefix includes a "header" that contains 2 bits per element in the prefix. For each element, * those 2 bits encode whether the element is null, empty, or none of those. */ - private void writeHeader(ClusteringPrefix clustering, DataOutputPlus out) throws IOException + private static long makeHeader(ClusteringPrefix clustering, int offset, int limit) { - int nbBytes = headerBytesCount(clustering.size()); - for (int i = 0; i < nbBytes; i++) + long header = 0; + for (int i = offset ; i < limit ; i++) { - int b = 0; - for (int j = 0; j < 4; j++) - { - int c = i * 4 + j; - if (c >= clustering.size()) - break; - - ByteBuffer v = clustering.get(c); - if (v == null) - b |= (1 << (j * 2) + 1); - else if (!v.hasRemaining()) - b |= (1 << (j * 2)); - } - out.writeByte((byte)b); + ByteBuffer v = clustering.get(i); + // no need to do modulo arithmetic for i, since the left-shift execute on the modulus of RH operand by definition + if (v == null) + header |= (1L << (i * 2) + 1); + else if (!v.hasRemaining()) + header |= (1L << (i * 2)); } - } - - private int[] readHeader(int size, DataInputPlus in) throws IOException - { - int nbBytes = headerBytesCount(size); - int[] header = new int[nbBytes]; - for (int i = 0; i < nbBytes; i++) - header[i] = in.readUnsignedByte(); return header; } - private static boolean isNull(int[] header, int i) + // no need to do modulo arithmetic for i, since the left-shift execute on the modulus of RH operand by definition + private static boolean isNull(long header, int i) { - int b = header[i / 4]; - int mask = 1 << ((i % 4) * 2) + 1; - return (b & mask) != 0; + long mask = 1L << (i * 2) + 1; + return (header & mask) != 0; } - private static boolean isEmpty(int[] header, int i) + // no need to do modulo arithmetic for i, since the left-shift execute on the modulus of RH operand by definition + private static boolean isEmpty(long header, int i) { - int b = header[i / 4]; - int mask = 1 << ((i % 4) * 2); - return (b & mask) != 0; + long mask = 1L << (i * 2); + return (header & mask) != 0; } } @@ -408,7 +402,7 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable private final SerializationHeader serializationHeader; private boolean nextIsRow; - private int[] nextHeader; + private long nextHeader; private int nextSize; private ClusteringPrefix.Kind nextKind; @@ -428,7 +422,6 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable this.nextIsRow = UnfilteredSerializer.kind(flags) == Unfiltered.Kind.ROW; this.nextKind = nextIsRow ? Kind.CLUSTERING : ClusteringPrefix.Kind.values()[in.readByte()]; this.nextSize = nextIsRow ? comparator.size() : in.readUnsignedShort(); - this.nextHeader = serializer.readHeader(nextSize, in); this.deserializedSize = 0; // The point of the deserializer is that some of the clustering prefix won't actually be used (because they are not @@ -478,6 +471,9 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable if (deserializedSize == nextSize) return false; + if ((deserializedSize % 32) == 0) + nextHeader = in.readUnsignedVInt(); + int i = deserializedSize++; nextValues[i] = Serializer.isNull(nextHeader, i) ? null @@ -513,9 +509,12 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable { for (int i = deserializedSize; i < nextSize; i++) { + if ((i % 32) == 0) + nextHeader = in.readUnsignedVInt(); if (!Serializer.isNull(nextHeader, i) && !Serializer.isEmpty(nextHeader, i)) serializationHeader.clusteringTypes().get(i).skipValue(in); } + deserializedSize = nextSize; return nextKind; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5786b320/src/java/org/apache/cassandra/db/TypeSizes.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java b/src/java/org/apache/cassandra/db/TypeSizes.java index 73766c8..7e5bd87 100644 --- a/src/java/org/apache/cassandra/db/TypeSizes.java +++ b/src/java/org/apache/cassandra/db/TypeSizes.java @@ -102,4 +102,9 @@ public final class TypeSizes { return VIntCoding.computeVIntSize(value); } + + public static int sizeofUnsignedVInt(long value) + { + return VIntCoding.computeUnsignedVIntSize(value); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5786b320/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index 4072f8d..11fa800 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -21,8 +21,8 @@ import java.io.IOException; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; -import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.SearchIterator; /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/5786b320/test/unit/org/apache/cassandra/cql3/SerializationMirrorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/SerializationMirrorTest.java b/test/unit/org/apache/cassandra/cql3/SerializationMirrorTest.java new file mode 100644 index 0000000..49f77a7 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/SerializationMirrorTest.java @@ -0,0 +1,63 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.cql3; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class SerializationMirrorTest extends CQLTester +{ + + @Test + public void testManyClusterings() throws Throwable + { + StringBuilder table = new StringBuilder("CREATE TABLE %s (a TEXT"); + StringBuilder cols = new StringBuilder(); + StringBuilder args = new StringBuilder("?"); + List<Object> vals = new ArrayList<>(); + vals.add("a"); + for (int i = 0 ; i < 40 ; i++) + { + table.append(", c").append(i).append(" text"); + cols.append(", c").append(i); + if (ThreadLocalRandom.current().nextBoolean()) + vals.add(Integer.toString(i)); + else + vals.add(""); + args.append(",?"); + } + args.append(",?"); + vals.add("value"); + table.append(", v text, PRIMARY KEY ((a)").append(cols).append("))"); + createTable(table.toString()); + + execute("INSERT INTO %s (a" + cols + ", v) VALUES (" + args+ ")", vals.toArray()); + flush(); + UntypedResultSet.Row row = execute("SELECT * FROM %s").one(); + for (int i = 0 ; i < row.getColumns().size() ; i++) + Assert.assertEquals(vals.get(i), row.getString(i == 0 ? "a" : i < 41 ? "c" + (i - 1) : "v")); + } + +}