Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 91564abad -> 84d836137


Add duration type to the protocol V5

patch by Benjamin Lerer; reviewed by Tyler Hobbs for CASSANDRA-12850


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84d83613
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84d83613
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84d83613

Branch: refs/heads/cassandra-3.11
Commit: 84d836137d02f7703e9efae44f499b0a9413226d
Parents: 91564ab
Author: Benjamin Lerer <b.le...@gmail.com>
Authored: Fri Jan 27 10:56:00 2017 +0100
Committer: Benjamin Lerer <b.le...@gmail.com>
Committed: Fri Jan 27 10:56:00 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 doc/native_protocol_v5.spec                     | 151 +++++++++++--------
 .../apache/cassandra/transport/DataType.java    |  62 +++++++-
 .../apache/cassandra/transport/OptionCodec.java | 121 ---------------
 .../cassandra/transport/DataTypeTest.java       |  61 ++++----
 5 files changed, 183 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/84d83613/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index aec644f..66e17a2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Add duration type to the protocol V5 (CASSANDRA-12850)
  * Fix duration type validation (CASSANDRA-13143)
  * Fix flaky GcCompactionTest (CASSANDRA-12664)
  * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84d83613/doc/native_protocol_v5.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec
index 35dd2d7..ac3373c 100644
--- a/doc/native_protocol_v5.spec
+++ b/doc/native_protocol_v5.spec
@@ -202,51 +202,69 @@ Table of Contents
   To describe the layout of the frame body for the messages in Section 4, we
   define the following:
 
-    [int]          A 4 bytes integer
-    [long]         A 8 bytes integer
-    [byte]         A 1 byte unsigned integer
-    [short]        A 2 bytes unsigned integer
-    [string]       A [short] n, followed by n bytes representing an UTF-8
-                   string.
-    [long string]  An [int] n, followed by n bytes representing an UTF-8 
string.
-    [uuid]         A 16 bytes long uuid.
-    [string list]  A [short] n, followed by n [string].
-    [bytes]        A [int] n, followed by n bytes if n >= 0. If n < 0,
-                   no byte should follow and the value represented is `null`.
-    [value]        A [int] n, followed by n bytes if n >= 0.
-                   If n == -1 no byte should follow and the value represented 
is `null`.
-                   If n == -2 no byte should follow and the value represented 
is
-                   `not set` not resulting in any change to the existing value.
-                   n < -2 is an invalid value and results in an error.
-    [short bytes]  A [short] n, followed by n bytes if n >= 0.
-
-    [option]       A pair of <id><value> where <id> is a [short] representing
-                   the option id and <value> depends on that option (and can be
-                   of size 0). The supported id (and the corresponding <value>)
-                   will be described when this is used.
-    [option list]  A [short] n, followed by n [option].
-    [inet]         An address (ip and port) to a node. It consists of one
-                   [byte] n, that represents the address size, followed by n
-                   [byte] representing the IP address (in practice n can only 
be
-                   either 4 (IPv4) or 16 (IPv6)), following by one [int]
-                   representing the port.
-    [inetaddr]     An IP address (without a port) to a node. It consists of one
-                   [byte] n, that represents the address size, followed by n
-                   [byte] representing the IP address.
-    [consistency]  A consistency level specification. This is a [short]
-                   representing a consistency level with the following
-                   correspondance:
-                     0x0000    ANY
-                     0x0001    ONE
-                     0x0002    TWO
-                     0x0003    THREE
-                     0x0004    QUORUM
-                     0x0005    ALL
-                     0x0006    LOCAL_QUORUM
-                     0x0007    EACH_QUORUM
-                     0x0008    SERIAL
-                     0x0009    LOCAL_SERIAL
-                     0x000A    LOCAL_ONE
+    [int]             A 4 bytes integer
+    [long]            A 8 bytes integer
+    [byte]            A 1 byte unsigned integer
+    [short]           A 2 bytes unsigned integer
+    [string]          A [short] n, followed by n bytes representing an UTF-8
+                      string.
+    [long string]     An [int] n, followed by n bytes representing an UTF-8 
string.
+    [uuid]            A 16 bytes long uuid.
+    [string list]     A [short] n, followed by n [string].
+    [bytes]           A [int] n, followed by n bytes if n >= 0. If n < 0,
+                      no byte should follow and the value represented is 
`null`.
+    [value]           A [int] n, followed by n bytes if n >= 0.
+                      If n == -1 no byte should follow and the value 
represented is `null`.
+                      If n == -2 no byte should follow and the value 
represented is
+                      `not set` not resulting in any change to the existing 
value.
+                      n < -2 is an invalid value and results in an error.
+    [short bytes]     A [short] n, followed by n bytes if n >= 0.
+
+    [unsigned vint]   An unsigned variable length integer. A vint is encoded 
with the most significant byte (MSB) first.
+                      The most significant byte will contains the information 
about how many extra bytes need to be read
+                      as well as the most significant bits of the integer.
+                      The number of extra bytes to read is encoded as 1 bits 
on the left side.
+                      For example, if we need to read 2 more bytes the first 
byte will start with 110
+                      (e.g. 256 000 will be encoded on 3 bytes as [110]00011 
11101000 00000000)
+                      If the encoded integer is 8 bytes long the vint will be 
encoded on 9 bytes and the first
+                      byte will be: 11111111
+
+   [vint]             A signed variable length integer. This is encoded using 
zig-zag encoding and then sent
+                      like an [unsigned vint]. Zig-zag encoding converts 
numbers as follows:
+                      0 = 0, -1 = 1, 1 = 2, -2 = 3, 2 = 4, -3 = 5, 3 = 6 and 
so forth.
+                      The purpose is to send small negative values as small 
unsigned values, so that we save bytes on the wire.
+                      To encode a value n use "(n >> 31) ^ (n << 1)" for 32 
bit values, and "(n >> 63) ^ (n << 1)"
+                      for 64 bit values where "^" is the xor operation, "<<" 
is the left shift operation and ">>" is
+                      the arithemtic right shift operation (highest-order bit 
is replicated).
+                      Decode with "(n >> 1) ^ -(n & 1)".
+
+    [option]          A pair of <id><value> where <id> is a [short] 
representing
+                      the option id and <value> depends on that option (and 
can be
+                      of size 0). The supported id (and the corresponding 
<value>)
+                      will be described when this is used.
+    [option list]     A [short] n, followed by n [option].
+    [inet]            An address (ip and port) to a node. It consists of one
+                      [byte] n, that represents the address size, followed by n
+                      [byte] representing the IP address (in practice n can 
only be
+                      either 4 (IPv4) or 16 (IPv6)), following by one [int]
+                      representing the port.
+    [inetaddr]        An IP address (without a port) to a node. It consists of 
one
+                      [byte] n, that represents the address size, followed by n
+                      [byte] representing the IP address.
+    [consistency]     A consistency level specification. This is a [short]
+                      representing a consistency level with the following
+                      correspondance:
+                        0x0000    ANY
+                        0x0001    ONE
+                        0x0002    TWO
+                        0x0003    THREE
+                        0x0004    QUORUM
+                        0x0005    ALL
+                        0x0006    LOCAL_QUORUM
+                        0x0007    EACH_QUORUM
+                        0x0008    SERIAL
+                        0x0009    LOCAL_SERIAL
+                        0x000A    LOCAL_ONE
 
     [string map]      A [short] n, followed by n pair <k><v> where <k> and <v>
                       are [string].
@@ -613,6 +631,7 @@ Table of Contents
             0x0012    Time
             0x0013    Smallint
             0x0014    Tinyint
+            0x0015    Duration
             0x0020    List: the value is an [option], representing the type
                             of the elements of the list.
             0x0021    Map: the value is two [option], representing the types 
of the
@@ -881,77 +900,86 @@ Table of Contents
 
   An 8 byte floating point number in the IEEE 754 binary64 format.
 
-6.8 float
+6.8 duration
+
+  A duration is composed of 3 signed variable length integers ([vint]s).
+  The first [vint] represents a number of months, the second [vint] represents
+  a number of days, and the last [vint] represents a number of nanoseconds.
+  A duration can either be positive or negative. If a duration is positive
+  all the integers must be positive or zero. If a duration is
+  negative all the numbers must be negative or zero.
+
+6.9 float
 
   A 4 byte floating point number in the IEEE 754 binary32 format.
 
-6.9 inet
+6.10 inet
 
   A 4 byte or 16 byte sequence denoting an IPv4 or IPv6 address, respectively.
 
-6.10 int
+6.11 int
 
   A 4 byte two's complement integer.
 
-6.11 list
+6.12 list
 
   A [int] n indicating the number of elements in the list, followed by n
   elements.  Each element is [bytes] representing the serialized value.
 
-6.12 map
+6.13 map
 
   A [int] n indicating the number of key/value pairs in the map, followed by
   n entries.  Each entry is composed of two [bytes] representing the key
   and value.
 
-6.13 set
+6.14 set
 
   A [int] n indicating the number of elements in the set, followed by n
   elements.  Each element is [bytes] representing the serialized value.
 
-6.14 smallint
+6.15 smallint
 
   A 2 byte two's complement integer.
 
-6.15 text
+6.16 text
 
   A sequence of bytes conforming to the UTF-8 specifications.
 
-6.16 time
+6.17 time
 
   An 8 byte two's complement long representing nanoseconds since midnight.
   Valid values are in the range 0 to 86399999999999
 
-6.17 timestamp
+6.18 timestamp
 
   An 8 byte two's complement integer representing a millisecond-precision
   offset from the unix epoch (00:00:00, January 1st, 1970).  Negative values
   represent a negative offset from the epoch.
 
-6.18 timeuuid
+6.19 timeuuid
 
   A 16 byte sequence representing a version 1 UUID as defined by RFC 4122.
 
-6.19 tinyint
+6.20 tinyint
 
   A 1 byte two's complement integer.
 
-6.20 tuple
+6.21 tuple
 
   A sequence of [bytes] values representing the items in a tuple.  The encoding
   of each element depends on the data type for that position in the tuple.
   Null values may be represented by using length -1 for the [bytes]
   representation of an element.
 
-6.21 uuid
+6.22 uuid
 
   A 16 byte sequence representing any valid UUID as defined by RFC 4122.
 
-6.22 varchar
+6.23 varchar
 
   An alias of the "text" type.
 
-6.23 varint
+6.24 varint
 
   A variable-length two's complement encoding of a signed integer.
 
@@ -1182,3 +1210,4 @@ Table of Contents
     a failure reason code which indicates why the request failed on that node.
   * Enlarged flag's bitmaps for QUERY, EXECUTE and BATCH messages from [byte] 
to [int]
     (Sections 4.1.4, 4.1.6 and 4.1.7).
+  * Add the duration data type

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84d83613/src/java/org/apache/cassandra/transport/DataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/DataType.java 
b/src/java/org/apache/cassandra/transport/DataType.java
index aef3fa1..6456b74 100644
--- a/src/java/org/apache/cassandra/transport/DataType.java
+++ b/src/java/org/apache/cassandra/transport/DataType.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.utils.Pair;
 
-public enum DataType implements OptionCodec.Codecable<DataType>
+public enum DataType
 {
     CUSTOM   (0,  null, ProtocolVersion.V1),
     ASCII    (1,  AsciiType.instance, ProtocolVersion.V1),
@@ -56,13 +56,14 @@ public enum DataType implements 
OptionCodec.Codecable<DataType>
     TIME     (18, TimeType.instance, ProtocolVersion.V4),
     SMALLINT (19, ShortType.instance, ProtocolVersion.V4),
     BYTE     (20, ByteType.instance, ProtocolVersion.V4),
+    DURATION (21, DurationType.instance, ProtocolVersion.V5),
     LIST     (32, null, ProtocolVersion.V1),
     MAP      (33, null, ProtocolVersion.V1),
     SET      (34, null, ProtocolVersion.V1),
     UDT      (48, null, ProtocolVersion.V3),
     TUPLE    (49, null, ProtocolVersion.V3);
 
-    public static final OptionCodec<DataType> codec = new 
OptionCodec<DataType>(DataType.class);
+    public static final Codec codec = new Codec();
 
     private final int id;
     private final ProtocolVersion protocolVersion;
@@ -302,4 +303,61 @@ public enum DataType implements 
OptionCodec.Codecable<DataType>
     {
         return protocolVersion;
     }
+
+    public static final class Codec
+    {
+        private final DataType[] ids;
+
+        public Codec()
+        {
+            DataType[] values = DataType.values();
+            ids = new DataType[getMaxId(values) + 1];
+            for (DataType opt : values)
+            {
+                int id = opt.getId(opt.getProtocolVersion());
+                DataType existingType = ids[id];
+                if (existingType != null)
+                    throw new IllegalStateException(String.format("Duplicate 
option id %d", id));
+                ids[id] = opt;
+            }
+        }
+
+        private int getMaxId(DataType[] values)
+        {
+            int maxId = -1;
+            for (DataType opt : values)
+                maxId = Math.max(maxId, opt.getId(ProtocolVersion.CURRENT));
+            return maxId;
+        }
+
+        private DataType fromId(int id)
+        {
+            DataType opt = ids[id];
+            if (opt == null)
+                throw new ProtocolException(String.format("Unknown option id 
%d", id));
+            return opt;
+        }
+
+        public Pair<DataType, Object> decodeOne(ByteBuf body, ProtocolVersion 
version)
+        {
+            DataType opt = fromId(body.readUnsignedShort());
+            Object value = opt.readValue(body, version);
+            return Pair.create(opt, value);
+        }
+
+        public void writeOne(Pair<DataType, Object> option, ByteBuf dest, 
ProtocolVersion version)
+        {
+            DataType opt = option.left;
+            Object obj = option.right;
+            dest.writeShort(opt.getId(version));
+            opt.writeValue(obj, dest, version);
+        }
+
+        public int oneSerializedSize(Pair<DataType, Object> option, 
ProtocolVersion version)
+        {
+            DataType opt = option.left;
+            Object obj = option.right;
+            return 2 + opt.serializedValueSize(obj, version);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84d83613/src/java/org/apache/cassandra/transport/OptionCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java 
b/src/java/org/apache/cassandra/transport/OptionCodec.java
deleted file mode 100644
index cdfadf6..0000000
--- a/src/java/org/apache/cassandra/transport/OptionCodec.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.transport;
-
-import java.lang.reflect.Array;
-import java.util.EnumMap;
-import java.util.Map;
-
-import io.netty.buffer.ByteBuf;
-
-import io.netty.buffer.Unpooled;
-import org.apache.cassandra.utils.Pair;
-
-public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
-{
-    public interface Codecable<T extends Enum<T>>
-    {
-        public int getId(ProtocolVersion version);
-
-        public Object readValue(ByteBuf cb, ProtocolVersion version);
-        public void writeValue(Object value, ByteBuf cb, ProtocolVersion 
version);
-        public int serializedValueSize(Object obj, ProtocolVersion version);
-    }
-
-    private final Class<T> klass;
-    private final T[] ids;
-
-    @SuppressWarnings({"unchecked"})
-    public OptionCodec(Class<T> klass)
-    {
-        this.klass = klass;
-
-        T[] values = klass.getEnumConstants();
-        int maxId = -1;
-        for (T opt : values)
-            maxId = Math.max(maxId, opt.getId(ProtocolVersion.CURRENT));
-        ids = (T[])Array.newInstance(klass, maxId + 1);
-        for (T opt : values)
-        {
-            if (ids[opt.getId(ProtocolVersion.CURRENT)] != null)
-                throw new IllegalStateException(String.format("Duplicate 
option id %d", opt.getId(ProtocolVersion.CURRENT)));
-            ids[opt.getId(ProtocolVersion.CURRENT)] = opt;
-        }
-    }
-
-    private T fromId(int id)
-    {
-        T opt = ids[id];
-        if (opt == null)
-            throw new ProtocolException(String.format("Unknown option id %d", 
id));
-        return opt;
-    }
-
-    public Map<T, Object> decode(ByteBuf body, ProtocolVersion version)
-    {
-        EnumMap<T, Object> options = new EnumMap<T, Object>(klass);
-        int n = body.readUnsignedShort();
-        for (int i = 0; i < n; i++)
-        {
-            T opt = fromId(body.readUnsignedShort());
-            Object value = opt.readValue(body, version);
-            if (options.containsKey(opt))
-                throw new ProtocolException(String.format("Duplicate option %s 
in message", opt.name()));
-            options.put(opt, value);
-        }
-        return options;
-    }
-
-    public ByteBuf encode(Map<T, Object> options, ProtocolVersion version)
-    {
-        int optLength = 2;
-        for (Map.Entry<T, Object> entry : options.entrySet())
-            optLength += 2 + 
entry.getKey().serializedValueSize(entry.getValue(), version);
-        ByteBuf cb = Unpooled.buffer(optLength);
-        cb.writeShort(options.size());
-        for (Map.Entry<T, Object> entry : options.entrySet())
-        {
-            T opt = entry.getKey();
-            cb.writeShort(opt.getId(version));
-            opt.writeValue(entry.getValue(), cb, version);
-        }
-        return cb;
-    }
-
-    public Pair<T, Object> decodeOne(ByteBuf body, ProtocolVersion version)
-    {
-        T opt = fromId(body.readUnsignedShort());
-        Object value = opt.readValue(body, version);
-        return Pair.create(opt, value);
-    }
-
-    public void writeOne(Pair<T, Object> option, ByteBuf dest, ProtocolVersion 
version)
-    {
-        T opt = option.left;
-        Object obj = option.right;
-        dest.writeShort(opt.getId(version));
-        opt.writeValue(obj, dest, version);
-    }
-
-    public int oneSerializedSize(Pair<T, Object> option, ProtocolVersion 
version)
-    {
-        T opt = option.left;
-        Object obj = option.right;
-        return 2 + opt.serializedValueSize(obj, version);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84d83613/test/unit/org/apache/cassandra/transport/DataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/DataTypeTest.java 
b/test/unit/org/apache/cassandra/transport/DataTypeTest.java
index c25730c..6f086c9 100644
--- a/test/unit/org/apache/cassandra/transport/DataTypeTest.java
+++ b/test/unit/org/apache/cassandra/transport/DataTypeTest.java
@@ -26,10 +26,13 @@ import java.util.Map;
 import org.junit.Test;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.assertEquals;
 
@@ -43,7 +46,7 @@ public class DataTypeTest
             if (isComplexType(type))
                 continue;
 
-            Map<DataType, Object> options = Collections.singletonMap(type, 
(Object)type.toString());
+            Pair<DataType, Object> options = Pair.create(type, 
(Object)type.toString());
             for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
                 testEncodeDecode(type, options, version);
         }
@@ -53,7 +56,7 @@ public class DataTypeTest
     public void TestListDataTypeSerialization()
     {
         DataType type = DataType.LIST;
-        Map<DataType, Object> options =  Collections.singletonMap(type, 
(Object)LongType.instance);
+        Pair<DataType, Object> options = Pair.create(type, 
(Object)LongType.instance);
         for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
             testEncodeDecode(type, options, version);
     }
@@ -65,41 +68,41 @@ public class DataTypeTest
         List<AbstractType> value = new ArrayList<>();
         value.add(LongType.instance);
         value.add(AsciiType.instance);
-        Map<DataType, Object> options = Collections.singletonMap(type, 
(Object)value);
+        Pair<DataType, Object> options = Pair.create(type, (Object)value);
         for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
             testEncodeDecode(type, options, version);
     }
 
-    private void testEncodeDecode(DataType type, Map<DataType, Object> 
options, ProtocolVersion version)
+    private void testEncodeDecode(DataType type, Pair<DataType, Object> 
options, ProtocolVersion version)
     {
-        ByteBuf dest = type.codec.encode(options, version);
-        Map<DataType, Object> results = type.codec.decode(dest, version);
+        int optLength = DataType.codec.oneSerializedSize(options, version);
+        ByteBuf dest = Unpooled.buffer(optLength);
+        DataType.codec.writeOne(options, dest, version);
+        Pair<DataType, Object> result = DataType.codec.decodeOne(dest, 
version);
 
-        for (DataType key : results.keySet())
+        System.out.println(result + "version " + version);
+        int ssize = type.serializedValueSize(result.right, version);
+        int esize = version.isSmallerThan(type.getProtocolVersion()) ? 2 + 
TypeSizes.encodedUTF8Length(result.right.toString()) : 0;
+        switch (type)
         {
-            int ssize = type.serializedValueSize(results.get(key), version);
-            int esize = version.isSmallerThan(type.getProtocolVersion()) ? 2 + 
TypeSizes.encodedUTF8Length(results.get(key).toString()) : 0;
-            switch (type)
-            {
-                case LIST:
-                case SET:
-                    esize += 2;
-                    break;
-                case MAP:
-                    esize += 4;
-                    break;
-                case CUSTOM:
-                    esize = 8;
-                    break;
-            }
-            assertEquals(esize, ssize);
-
-            DataType expected = 
version.isSmallerThan(type.getProtocolVersion())
-                ? DataType.CUSTOM
-                : type;
-            assertEquals(expected, key);
+            case LIST:
+            case SET:
+                esize += 2;
+                break;
+            case MAP:
+                esize += 4;
+                break;
+            case CUSTOM:
+                esize = 8;
+                break;
         }
-    }
+        assertEquals(esize, ssize);
+
+        DataType expected = version.isSmallerThan(type.getProtocolVersion())
+            ? DataType.CUSTOM
+            : type;
+        assertEquals(expected, result.left);
+   }
 
     private boolean isComplexType(DataType type)
     {

Reply via email to