Repository: cassandra Updated Branches: refs/heads/cassandra-3.11 91564abad -> 84d836137
Add duration type to the protocol V5 patch by Benjamin Lerer; reviewed by Tyler Hobbs for CASSANDRA-12850 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84d83613 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84d83613 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84d83613 Branch: refs/heads/cassandra-3.11 Commit: 84d836137d02f7703e9efae44f499b0a9413226d Parents: 91564ab Author: Benjamin Lerer <b.le...@gmail.com> Authored: Fri Jan 27 10:56:00 2017 +0100 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Fri Jan 27 10:56:00 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/native_protocol_v5.spec | 151 +++++++++++-------- .../apache/cassandra/transport/DataType.java | 62 +++++++- .../apache/cassandra/transport/OptionCodec.java | 121 --------------- .../cassandra/transport/DataTypeTest.java | 61 ++++---- 5 files changed, 183 insertions(+), 213 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/84d83613/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index aec644f..66e17a2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Add duration type to the protocol V5 (CASSANDRA-12850) * Fix duration type validation (CASSANDRA-13143) * Fix flaky GcCompactionTest (CASSANDRA-12664) * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058) http://git-wip-us.apache.org/repos/asf/cassandra/blob/84d83613/doc/native_protocol_v5.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec index 35dd2d7..ac3373c 100644 --- a/doc/native_protocol_v5.spec +++ b/doc/native_protocol_v5.spec @@ -202,51 +202,69 @@ Table of Contents To describe the layout of the frame body for the messages in Section 4, we define the following: - [int] A 4 bytes integer - [long] A 8 bytes integer - [byte] A 1 byte unsigned integer - [short] A 2 bytes unsigned integer - [string] A [short] n, followed by n bytes representing an UTF-8 - string. - [long string] An [int] n, followed by n bytes representing an UTF-8 string. - [uuid] A 16 bytes long uuid. - [string list] A [short] n, followed by n [string]. - [bytes] A [int] n, followed by n bytes if n >= 0. If n < 0, - no byte should follow and the value represented is `null`. - [value] A [int] n, followed by n bytes if n >= 0. - If n == -1 no byte should follow and the value represented is `null`. - If n == -2 no byte should follow and the value represented is - `not set` not resulting in any change to the existing value. - n < -2 is an invalid value and results in an error. - [short bytes] A [short] n, followed by n bytes if n >= 0. - - [option] A pair of <id><value> where <id> is a [short] representing - the option id and <value> depends on that option (and can be - of size 0). The supported id (and the corresponding <value>) - will be described when this is used. - [option list] A [short] n, followed by n [option]. - [inet] An address (ip and port) to a node. It consists of one - [byte] n, that represents the address size, followed by n - [byte] representing the IP address (in practice n can only be - either 4 (IPv4) or 16 (IPv6)), following by one [int] - representing the port. - [inetaddr] An IP address (without a port) to a node. It consists of one - [byte] n, that represents the address size, followed by n - [byte] representing the IP address. - [consistency] A consistency level specification. This is a [short] - representing a consistency level with the following - correspondance: - 0x0000 ANY - 0x0001 ONE - 0x0002 TWO - 0x0003 THREE - 0x0004 QUORUM - 0x0005 ALL - 0x0006 LOCAL_QUORUM - 0x0007 EACH_QUORUM - 0x0008 SERIAL - 0x0009 LOCAL_SERIAL - 0x000A LOCAL_ONE + [int] A 4 bytes integer + [long] A 8 bytes integer + [byte] A 1 byte unsigned integer + [short] A 2 bytes unsigned integer + [string] A [short] n, followed by n bytes representing an UTF-8 + string. + [long string] An [int] n, followed by n bytes representing an UTF-8 string. + [uuid] A 16 bytes long uuid. + [string list] A [short] n, followed by n [string]. + [bytes] A [int] n, followed by n bytes if n >= 0. If n < 0, + no byte should follow and the value represented is `null`. + [value] A [int] n, followed by n bytes if n >= 0. + If n == -1 no byte should follow and the value represented is `null`. + If n == -2 no byte should follow and the value represented is + `not set` not resulting in any change to the existing value. + n < -2 is an invalid value and results in an error. + [short bytes] A [short] n, followed by n bytes if n >= 0. + + [unsigned vint] An unsigned variable length integer. A vint is encoded with the most significant byte (MSB) first. + The most significant byte will contains the information about how many extra bytes need to be read + as well as the most significant bits of the integer. + The number of extra bytes to read is encoded as 1 bits on the left side. + For example, if we need to read 2 more bytes the first byte will start with 110 + (e.g. 256 000 will be encoded on 3 bytes as [110]00011 11101000 00000000) + If the encoded integer is 8 bytes long the vint will be encoded on 9 bytes and the first + byte will be: 11111111 + + [vint] A signed variable length integer. This is encoded using zig-zag encoding and then sent + like an [unsigned vint]. Zig-zag encoding converts numbers as follows: + 0 = 0, -1 = 1, 1 = 2, -2 = 3, 2 = 4, -3 = 5, 3 = 6 and so forth. + The purpose is to send small negative values as small unsigned values, so that we save bytes on the wire. + To encode a value n use "(n >> 31) ^ (n << 1)" for 32 bit values, and "(n >> 63) ^ (n << 1)" + for 64 bit values where "^" is the xor operation, "<<" is the left shift operation and ">>" is + the arithemtic right shift operation (highest-order bit is replicated). + Decode with "(n >> 1) ^ -(n & 1)". + + [option] A pair of <id><value> where <id> is a [short] representing + the option id and <value> depends on that option (and can be + of size 0). The supported id (and the corresponding <value>) + will be described when this is used. + [option list] A [short] n, followed by n [option]. + [inet] An address (ip and port) to a node. It consists of one + [byte] n, that represents the address size, followed by n + [byte] representing the IP address (in practice n can only be + either 4 (IPv4) or 16 (IPv6)), following by one [int] + representing the port. + [inetaddr] An IP address (without a port) to a node. It consists of one + [byte] n, that represents the address size, followed by n + [byte] representing the IP address. + [consistency] A consistency level specification. This is a [short] + representing a consistency level with the following + correspondance: + 0x0000 ANY + 0x0001 ONE + 0x0002 TWO + 0x0003 THREE + 0x0004 QUORUM + 0x0005 ALL + 0x0006 LOCAL_QUORUM + 0x0007 EACH_QUORUM + 0x0008 SERIAL + 0x0009 LOCAL_SERIAL + 0x000A LOCAL_ONE [string map] A [short] n, followed by n pair <k><v> where <k> and <v> are [string]. @@ -613,6 +631,7 @@ Table of Contents 0x0012 Time 0x0013 Smallint 0x0014 Tinyint + 0x0015 Duration 0x0020 List: the value is an [option], representing the type of the elements of the list. 0x0021 Map: the value is two [option], representing the types of the @@ -881,77 +900,86 @@ Table of Contents An 8 byte floating point number in the IEEE 754 binary64 format. -6.8 float +6.8 duration + + A duration is composed of 3 signed variable length integers ([vint]s). + The first [vint] represents a number of months, the second [vint] represents + a number of days, and the last [vint] represents a number of nanoseconds. + A duration can either be positive or negative. If a duration is positive + all the integers must be positive or zero. If a duration is + negative all the numbers must be negative or zero. + +6.9 float A 4 byte floating point number in the IEEE 754 binary32 format. -6.9 inet +6.10 inet A 4 byte or 16 byte sequence denoting an IPv4 or IPv6 address, respectively. -6.10 int +6.11 int A 4 byte two's complement integer. -6.11 list +6.12 list A [int] n indicating the number of elements in the list, followed by n elements. Each element is [bytes] representing the serialized value. -6.12 map +6.13 map A [int] n indicating the number of key/value pairs in the map, followed by n entries. Each entry is composed of two [bytes] representing the key and value. -6.13 set +6.14 set A [int] n indicating the number of elements in the set, followed by n elements. Each element is [bytes] representing the serialized value. -6.14 smallint +6.15 smallint A 2 byte two's complement integer. -6.15 text +6.16 text A sequence of bytes conforming to the UTF-8 specifications. -6.16 time +6.17 time An 8 byte two's complement long representing nanoseconds since midnight. Valid values are in the range 0 to 86399999999999 -6.17 timestamp +6.18 timestamp An 8 byte two's complement integer representing a millisecond-precision offset from the unix epoch (00:00:00, January 1st, 1970). Negative values represent a negative offset from the epoch. -6.18 timeuuid +6.19 timeuuid A 16 byte sequence representing a version 1 UUID as defined by RFC 4122. -6.19 tinyint +6.20 tinyint A 1 byte two's complement integer. -6.20 tuple +6.21 tuple A sequence of [bytes] values representing the items in a tuple. The encoding of each element depends on the data type for that position in the tuple. Null values may be represented by using length -1 for the [bytes] representation of an element. -6.21 uuid +6.22 uuid A 16 byte sequence representing any valid UUID as defined by RFC 4122. -6.22 varchar +6.23 varchar An alias of the "text" type. -6.23 varint +6.24 varint A variable-length two's complement encoding of a signed integer. @@ -1182,3 +1210,4 @@ Table of Contents a failure reason code which indicates why the request failed on that node. * Enlarged flag's bitmaps for QUERY, EXECUTE and BATCH messages from [byte] to [int] (Sections 4.1.4, 4.1.6 and 4.1.7). + * Add the duration data type http://git-wip-us.apache.org/repos/asf/cassandra/blob/84d83613/src/java/org/apache/cassandra/transport/DataType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/DataType.java b/src/java/org/apache/cassandra/transport/DataType.java index aef3fa1..6456b74 100644 --- a/src/java/org/apache/cassandra/transport/DataType.java +++ b/src/java/org/apache/cassandra/transport/DataType.java @@ -33,7 +33,7 @@ import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.utils.Pair; -public enum DataType implements OptionCodec.Codecable<DataType> +public enum DataType { CUSTOM (0, null, ProtocolVersion.V1), ASCII (1, AsciiType.instance, ProtocolVersion.V1), @@ -56,13 +56,14 @@ public enum DataType implements OptionCodec.Codecable<DataType> TIME (18, TimeType.instance, ProtocolVersion.V4), SMALLINT (19, ShortType.instance, ProtocolVersion.V4), BYTE (20, ByteType.instance, ProtocolVersion.V4), + DURATION (21, DurationType.instance, ProtocolVersion.V5), LIST (32, null, ProtocolVersion.V1), MAP (33, null, ProtocolVersion.V1), SET (34, null, ProtocolVersion.V1), UDT (48, null, ProtocolVersion.V3), TUPLE (49, null, ProtocolVersion.V3); - public static final OptionCodec<DataType> codec = new OptionCodec<DataType>(DataType.class); + public static final Codec codec = new Codec(); private final int id; private final ProtocolVersion protocolVersion; @@ -302,4 +303,61 @@ public enum DataType implements OptionCodec.Codecable<DataType> { return protocolVersion; } + + public static final class Codec + { + private final DataType[] ids; + + public Codec() + { + DataType[] values = DataType.values(); + ids = new DataType[getMaxId(values) + 1]; + for (DataType opt : values) + { + int id = opt.getId(opt.getProtocolVersion()); + DataType existingType = ids[id]; + if (existingType != null) + throw new IllegalStateException(String.format("Duplicate option id %d", id)); + ids[id] = opt; + } + } + + private int getMaxId(DataType[] values) + { + int maxId = -1; + for (DataType opt : values) + maxId = Math.max(maxId, opt.getId(ProtocolVersion.CURRENT)); + return maxId; + } + + private DataType fromId(int id) + { + DataType opt = ids[id]; + if (opt == null) + throw new ProtocolException(String.format("Unknown option id %d", id)); + return opt; + } + + public Pair<DataType, Object> decodeOne(ByteBuf body, ProtocolVersion version) + { + DataType opt = fromId(body.readUnsignedShort()); + Object value = opt.readValue(body, version); + return Pair.create(opt, value); + } + + public void writeOne(Pair<DataType, Object> option, ByteBuf dest, ProtocolVersion version) + { + DataType opt = option.left; + Object obj = option.right; + dest.writeShort(opt.getId(version)); + opt.writeValue(obj, dest, version); + } + + public int oneSerializedSize(Pair<DataType, Object> option, ProtocolVersion version) + { + DataType opt = option.left; + Object obj = option.right; + return 2 + opt.serializedValueSize(obj, version); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/84d83613/src/java/org/apache/cassandra/transport/OptionCodec.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java b/src/java/org/apache/cassandra/transport/OptionCodec.java deleted file mode 100644 index cdfadf6..0000000 --- a/src/java/org/apache/cassandra/transport/OptionCodec.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.transport; - -import java.lang.reflect.Array; -import java.util.EnumMap; -import java.util.Map; - -import io.netty.buffer.ByteBuf; - -import io.netty.buffer.Unpooled; -import org.apache.cassandra.utils.Pair; - -public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>> -{ - public interface Codecable<T extends Enum<T>> - { - public int getId(ProtocolVersion version); - - public Object readValue(ByteBuf cb, ProtocolVersion version); - public void writeValue(Object value, ByteBuf cb, ProtocolVersion version); - public int serializedValueSize(Object obj, ProtocolVersion version); - } - - private final Class<T> klass; - private final T[] ids; - - @SuppressWarnings({"unchecked"}) - public OptionCodec(Class<T> klass) - { - this.klass = klass; - - T[] values = klass.getEnumConstants(); - int maxId = -1; - for (T opt : values) - maxId = Math.max(maxId, opt.getId(ProtocolVersion.CURRENT)); - ids = (T[])Array.newInstance(klass, maxId + 1); - for (T opt : values) - { - if (ids[opt.getId(ProtocolVersion.CURRENT)] != null) - throw new IllegalStateException(String.format("Duplicate option id %d", opt.getId(ProtocolVersion.CURRENT))); - ids[opt.getId(ProtocolVersion.CURRENT)] = opt; - } - } - - private T fromId(int id) - { - T opt = ids[id]; - if (opt == null) - throw new ProtocolException(String.format("Unknown option id %d", id)); - return opt; - } - - public Map<T, Object> decode(ByteBuf body, ProtocolVersion version) - { - EnumMap<T, Object> options = new EnumMap<T, Object>(klass); - int n = body.readUnsignedShort(); - for (int i = 0; i < n; i++) - { - T opt = fromId(body.readUnsignedShort()); - Object value = opt.readValue(body, version); - if (options.containsKey(opt)) - throw new ProtocolException(String.format("Duplicate option %s in message", opt.name())); - options.put(opt, value); - } - return options; - } - - public ByteBuf encode(Map<T, Object> options, ProtocolVersion version) - { - int optLength = 2; - for (Map.Entry<T, Object> entry : options.entrySet()) - optLength += 2 + entry.getKey().serializedValueSize(entry.getValue(), version); - ByteBuf cb = Unpooled.buffer(optLength); - cb.writeShort(options.size()); - for (Map.Entry<T, Object> entry : options.entrySet()) - { - T opt = entry.getKey(); - cb.writeShort(opt.getId(version)); - opt.writeValue(entry.getValue(), cb, version); - } - return cb; - } - - public Pair<T, Object> decodeOne(ByteBuf body, ProtocolVersion version) - { - T opt = fromId(body.readUnsignedShort()); - Object value = opt.readValue(body, version); - return Pair.create(opt, value); - } - - public void writeOne(Pair<T, Object> option, ByteBuf dest, ProtocolVersion version) - { - T opt = option.left; - Object obj = option.right; - dest.writeShort(opt.getId(version)); - opt.writeValue(obj, dest, version); - } - - public int oneSerializedSize(Pair<T, Object> option, ProtocolVersion version) - { - T opt = option.left; - Object obj = option.right; - return 2 + opt.serializedValueSize(obj, version); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/84d83613/test/unit/org/apache/cassandra/transport/DataTypeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/DataTypeTest.java b/test/unit/org/apache/cassandra/transport/DataTypeTest.java index c25730c..6f086c9 100644 --- a/test/unit/org/apache/cassandra/transport/DataTypeTest.java +++ b/test/unit/org/apache/cassandra/transport/DataTypeTest.java @@ -26,10 +26,13 @@ import java.util.Map; import org.junit.Test; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.utils.Pair; import static org.junit.Assert.assertEquals; @@ -43,7 +46,7 @@ public class DataTypeTest if (isComplexType(type)) continue; - Map<DataType, Object> options = Collections.singletonMap(type, (Object)type.toString()); + Pair<DataType, Object> options = Pair.create(type, (Object)type.toString()); for (ProtocolVersion version : ProtocolVersion.SUPPORTED) testEncodeDecode(type, options, version); } @@ -53,7 +56,7 @@ public class DataTypeTest public void TestListDataTypeSerialization() { DataType type = DataType.LIST; - Map<DataType, Object> options = Collections.singletonMap(type, (Object)LongType.instance); + Pair<DataType, Object> options = Pair.create(type, (Object)LongType.instance); for (ProtocolVersion version : ProtocolVersion.SUPPORTED) testEncodeDecode(type, options, version); } @@ -65,41 +68,41 @@ public class DataTypeTest List<AbstractType> value = new ArrayList<>(); value.add(LongType.instance); value.add(AsciiType.instance); - Map<DataType, Object> options = Collections.singletonMap(type, (Object)value); + Pair<DataType, Object> options = Pair.create(type, (Object)value); for (ProtocolVersion version : ProtocolVersion.SUPPORTED) testEncodeDecode(type, options, version); } - private void testEncodeDecode(DataType type, Map<DataType, Object> options, ProtocolVersion version) + private void testEncodeDecode(DataType type, Pair<DataType, Object> options, ProtocolVersion version) { - ByteBuf dest = type.codec.encode(options, version); - Map<DataType, Object> results = type.codec.decode(dest, version); + int optLength = DataType.codec.oneSerializedSize(options, version); + ByteBuf dest = Unpooled.buffer(optLength); + DataType.codec.writeOne(options, dest, version); + Pair<DataType, Object> result = DataType.codec.decodeOne(dest, version); - for (DataType key : results.keySet()) + System.out.println(result + "version " + version); + int ssize = type.serializedValueSize(result.right, version); + int esize = version.isSmallerThan(type.getProtocolVersion()) ? 2 + TypeSizes.encodedUTF8Length(result.right.toString()) : 0; + switch (type) { - int ssize = type.serializedValueSize(results.get(key), version); - int esize = version.isSmallerThan(type.getProtocolVersion()) ? 2 + TypeSizes.encodedUTF8Length(results.get(key).toString()) : 0; - switch (type) - { - case LIST: - case SET: - esize += 2; - break; - case MAP: - esize += 4; - break; - case CUSTOM: - esize = 8; - break; - } - assertEquals(esize, ssize); - - DataType expected = version.isSmallerThan(type.getProtocolVersion()) - ? DataType.CUSTOM - : type; - assertEquals(expected, key); + case LIST: + case SET: + esize += 2; + break; + case MAP: + esize += 4; + break; + case CUSTOM: + esize = 8; + break; } - } + assertEquals(esize, ssize); + + DataType expected = version.isSmallerThan(type.getProtocolVersion()) + ? DataType.CUSTOM + : type; + assertEquals(expected, result.left); + } private boolean isComplexType(DataType type) {