Updated Branches:
  refs/heads/trunk 5d3b08ef1 -> bf2ee0443

Add binary protocol versioning

patch by marcuse, reviewed by pcmanus for CASSANDRA-5436


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf2ee044
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf2ee044
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf2ee044

Branch: refs/heads/trunk
Commit: bf2ee04431fa3f8142ad43cbf514b69741002f7e
Parents: 5d3b08e
Author: Marcus Eriksson <marc...@spotify.com>
Authored: Wed Apr 17 13:10:54 2013 -0400
Committer: Marcus Eriksson <marc...@spotify.com>
Committed: Wed Apr 24 10:11:09 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +-
 src/java/org/apache/cassandra/cql3/ResultSet.java  |    6 ++--
 .../org/apache/cassandra/transport/CBCodec.java    |    2 +-
 src/java/org/apache/cassandra/transport/Frame.java |   10 +++---
 .../org/apache/cassandra/transport/Message.java    |   25 ++++++++++++---
 .../transport/messages/AuthenticateMessage.java    |    2 +-
 .../transport/messages/CredentialsMessage.java     |    2 +-
 .../cassandra/transport/messages/ErrorMessage.java |    2 +-
 .../cassandra/transport/messages/EventMessage.java |    2 +-
 .../transport/messages/ExecuteMessage.java         |    2 +-
 .../transport/messages/OptionsMessage.java         |    2 +-
 .../transport/messages/PrepareMessage.java         |    2 +-
 .../cassandra/transport/messages/QueryMessage.java |    2 +-
 .../cassandra/transport/messages/ReadyMessage.java |    2 +-
 .../transport/messages/RegisterMessage.java        |    2 +-
 .../transport/messages/ResultMessage.java          |   18 +++++-----
 .../transport/messages/StartupMessage.java         |    2 +-
 .../transport/messages/SupportedMessage.java       |    2 +-
 18 files changed, 51 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4fdef2c..10136fe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -35,7 +35,7 @@
    out TreeRequests (CASSANDRA-4932)
  * Add an official way to disable compactions (CASSANDRA-5074)
  * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919)
-
+ * Add binary protocol versioning (CASSANDRA-5436)
 
 1.2.5
  * remove per-row column name bloom filters (CASSANDRA-5492)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java 
b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 3ddfdc4..5f0dcf1 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -175,9 +175,9 @@ public class ResultSet
          *   - rows count (4 bytes)
          *   - rows
          */
-        public ResultSet decode(ChannelBuffer body)
+        public ResultSet decode(ChannelBuffer body, int version)
         {
-            Metadata m = Metadata.codec.decode(body);
+            Metadata m = Metadata.codec.decode(body, version);
             int rowCount = body.readInt();
             ResultSet rs = new ResultSet(m, new 
ArrayList<List<ByteBuffer>>(rowCount));
 
@@ -255,7 +255,7 @@ public class ResultSet
 
         private static class Codec implements CBCodec<Metadata>
         {
-            public Metadata decode(ChannelBuffer body)
+            public Metadata decode(ChannelBuffer body, int version)
             {
                 // flags & column count
                 int iflags = body.readInt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/CBCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBCodec.java 
b/src/java/org/apache/cassandra/transport/CBCodec.java
index 1a6719b..2250816 100644
--- a/src/java/org/apache/cassandra/transport/CBCodec.java
+++ b/src/java/org/apache/cassandra/transport/CBCodec.java
@@ -21,6 +21,6 @@ import org.jboss.netty.buffer.ChannelBuffer;
 
 public interface CBCodec<T>
 {
-    public T decode(ChannelBuffer body);
+    public T decode(ChannelBuffer body, int version);
     public ChannelBuffer encode(T t);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java 
b/src/java/org/apache/cassandra/transport/Frame.java
index be9df1a..014d512 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -75,16 +75,16 @@ public class Frame
         return new Frame(header, fullFrame, connection);
     }
 
-    public static Frame create(Message.Type type, int streamId, 
EnumSet<Header.Flag> flags, ChannelBuffer body, Connection connection)
+    public static Frame create(Message.Type type, int streamId, int version, 
EnumSet<Header.Flag> flags, ChannelBuffer body, Connection connection)
     {
-        Header header = new Header(Header.CURRENT_VERSION, flags, streamId, 
type);
+        Header header = new Header(version, flags, streamId, type);
         return new Frame(header, body, connection);
     }
 
     public static class Header
     {
         public static final int LENGTH = 8;
-        public static final int CURRENT_VERSION = 1;
+        public static final int CURRENT_VERSION = 2;
 
         public final int version;
         public final EnumSet<Flag> flags;
@@ -170,8 +170,8 @@ public class Frame
                 int firstByte = buffer.getByte(0);
                 Message.Direction direction = 
Message.Direction.extractFromVersion(firstByte);
                 int version = firstByte & 0x7F;
-                // We really only support the current version so far
-                if (version != Header.CURRENT_VERSION)
+
+                if (version > Header.CURRENT_VERSION)
                     throw new ProtocolException("Invalid or unsupported 
protocol version: " + version);
 
                 // Validate the opcode

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index 5cc3998..3121ce9 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -115,6 +115,7 @@ public abstract class Message
     public final Type type;
     protected volatile Connection connection;
     private volatile int streamId;
+    private volatile int version = Frame.Header.CURRENT_VERSION;
 
     protected Message(Type type)
     {
@@ -142,6 +143,17 @@ public abstract class Message
         return streamId;
     }
 
+    public int getVersion()
+    {
+        return version;
+    }
+
+    public Message setVersion(int version)
+    {
+        this.version = version;
+        return this;
+    }
+
     public abstract ChannelBuffer encode();
 
     public static abstract class Request extends Message
@@ -207,8 +219,9 @@ public abstract class Message
 
             try
             {
-                Message message = frame.header.type.codec.decode(frame.body);
+                Message message = frame.header.type.codec.decode(frame.body, 
frame.header.version);
                 message.setStreamId(frame.header.streamId);
+                message.setVersion(frame.header.version);
 
                 if (isRequest)
                 {
@@ -260,7 +273,8 @@ public abstract class Message
                 if (((Request)message).isTracingRequested())
                     flags.add(Frame.Header.Flag.TRACING);
             }
-            return Frame.create(message.type, message.getStreamId(), flags, 
body, message.connection());
+
+            return Frame.create(message.type, message.getStreamId(), 
message.getVersion(), flags, body, message.connection());
         }
     }
 
@@ -282,21 +296,22 @@ public abstract class Message
                 ServerConnection connection = 
(ServerConnection)request.connection();
                 connection.validateNewMessage(request.type);
 
-                logger.debug("Received: {}", request);
+                logger.debug("Received: {}, v={}", request, 
request.getVersion());
 
                 Response response = 
request.execute(connection.getQueryState(request.getStreamId()));
                 response.setStreamId(request.getStreamId());
+                response.setVersion(request.getVersion());
                 response.attach(connection);
                 connection.applyStateTransition(request.type, response.type);
 
-                logger.debug("Responding: {}", response);
+                logger.debug("Responding: {}, v={}", response, 
response.getVersion());
 
                 ctx.getChannel().write(response);
             }
             catch (Exception ex)
             {
                 // Don't let the exception propagate to exceptionCaught() if 
we can help it so that we can assign the right streamID.
-                
ctx.getChannel().write(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()));
+                
ctx.getChannel().write(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()).setVersion(request.getVersion()));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java 
b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
index ce436ea..d781f68 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
@@ -29,7 +29,7 @@ public class AuthenticateMessage extends Message.Response
 {
     public static final Message.Codec<AuthenticateMessage> codec = new 
Message.Codec<AuthenticateMessage>()
     {
-        public AuthenticateMessage decode(ChannelBuffer body)
+        public AuthenticateMessage decode(ChannelBuffer body, int version)
         {
             String authenticator = CBUtil.readString(body);
             return new AuthenticateMessage(authenticator);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java 
b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index db82844..512675e 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -35,7 +35,7 @@ public class CredentialsMessage extends Message.Request
 {
     public static final Message.Codec<CredentialsMessage> codec = new 
Message.Codec<CredentialsMessage>()
     {
-        public CredentialsMessage decode(ChannelBuffer body)
+        public CredentialsMessage decode(ChannelBuffer body, int version)
         {
             CredentialsMessage msg = new CredentialsMessage();
             int count = body.readUnsignedShort();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index dd685fa..3243bce 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -43,7 +43,7 @@ public class ErrorMessage extends Message.Response
 
     public static final Message.Codec<ErrorMessage> codec = new 
Message.Codec<ErrorMessage>()
     {
-        public ErrorMessage decode(ChannelBuffer body)
+        public ErrorMessage decode(ChannelBuffer body, int version)
         {
             ExceptionCode code = ExceptionCode.fromValue(body.readInt());
             String msg = CBUtil.readString(body);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/EventMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java 
b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
index d8fd6f0..7d67de9 100644
--- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
@@ -26,7 +26,7 @@ public class EventMessage extends Message.Response
 {
     public static final Message.Codec<EventMessage> codec = new 
Message.Codec<EventMessage>()
     {
-        public EventMessage decode(ChannelBuffer body)
+        public EventMessage decode(ChannelBuffer body, int version)
         {
             return new EventMessage(Event.deserialize(body));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 2752d2b..62fa10a 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -39,7 +39,7 @@ public class ExecuteMessage extends Message.Request
 {
     public static final Message.Codec<ExecuteMessage> codec = new 
Message.Codec<ExecuteMessage>()
     {
-        public ExecuteMessage decode(ChannelBuffer body)
+        public ExecuteMessage decode(ChannelBuffer body, int version)
         {
             byte[] id = CBUtil.readBytes(body);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java 
b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 30bd046..6e753d3 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -37,7 +37,7 @@ public class OptionsMessage extends Message.Request
 {
     public static final Message.Codec<OptionsMessage> codec = new 
Message.Codec<OptionsMessage>()
     {
-        public OptionsMessage decode(ChannelBuffer body)
+        public OptionsMessage decode(ChannelBuffer body, int version)
         {
             return new OptionsMessage();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java 
b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index 6705474..13f6321 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -32,7 +32,7 @@ public class PrepareMessage extends Message.Request
 {
     public static final Message.Codec<PrepareMessage> codec = new 
Message.Codec<PrepareMessage>()
     {
-        public PrepareMessage decode(ChannelBuffer body)
+        public PrepareMessage decode(ChannelBuffer body, int version)
         {
             String query = CBUtil.readLongString(body);
             return new PrepareMessage(query);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java 
b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 8d83e47..69c6529 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -38,7 +38,7 @@ public class QueryMessage extends Message.Request
 {
     public static final Message.Codec<QueryMessage> codec = new 
Message.Codec<QueryMessage>()
     {
-        public QueryMessage decode(ChannelBuffer body)
+        public QueryMessage decode(ChannelBuffer body, int version)
         {
             String query = CBUtil.readLongString(body);
             ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
index fa92619..63899e1 100644
--- a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
@@ -29,7 +29,7 @@ public class ReadyMessage extends Message.Response
 {
     public static final Message.Codec<ReadyMessage> codec = new 
Message.Codec<ReadyMessage>()
     {
-        public ReadyMessage decode(ChannelBuffer body)
+        public ReadyMessage decode(ChannelBuffer body, int version)
         {
             return new ReadyMessage();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java 
b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
index 61e11af..9969e3a 100644
--- a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
@@ -30,7 +30,7 @@ public class RegisterMessage extends Message.Request
 {
     public static final Message.Codec<RegisterMessage> codec = new 
Message.Codec<RegisterMessage>()
     {
-        public RegisterMessage decode(ChannelBuffer body)
+        public RegisterMessage decode(ChannelBuffer body, int version)
         {
             int length = body.readUnsignedShort();
             List<Event.Type> eventTypes = new ArrayList<Event.Type>(length);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index 739738c..fcff0a8 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -34,10 +34,10 @@ public abstract class ResultMessage extends Message.Response
 {
     public static final Message.Codec<ResultMessage> codec = new 
Message.Codec<ResultMessage>()
     {
-        public ResultMessage decode(ChannelBuffer body)
+        public ResultMessage decode(ChannelBuffer body, int version)
         {
             Kind kind = Kind.fromId(body.readInt());
-            return kind.subcodec.decode(body);
+            return kind.subcodec.decode(body, version);
         }
 
         public ChannelBuffer encode(ResultMessage msg)
@@ -119,7 +119,7 @@ public abstract class ResultMessage extends Message.Response
 
         public static final Message.Codec<ResultMessage> subcodec = new 
Message.Codec<ResultMessage>()
         {
-            public ResultMessage decode(ChannelBuffer body)
+            public ResultMessage decode(ChannelBuffer body, int version)
             {
                 return new Void();
             }
@@ -160,7 +160,7 @@ public abstract class ResultMessage extends Message.Response
 
         public static final Message.Codec<ResultMessage> subcodec = new 
Message.Codec<ResultMessage>()
         {
-            public ResultMessage decode(ChannelBuffer body)
+            public ResultMessage decode(ChannelBuffer body, int version)
             {
                 String keyspace = CBUtil.readString(body);
                 return new SetKeyspace(keyspace);
@@ -194,9 +194,9 @@ public abstract class ResultMessage extends Message.Response
     {
         public static final Message.Codec<ResultMessage> subcodec = new 
Message.Codec<ResultMessage>()
         {
-            public ResultMessage decode(ChannelBuffer body)
+            public ResultMessage decode(ChannelBuffer body, int version)
             {
-                return new Rows(ResultSet.codec.decode(body));
+                return new Rows(ResultSet.codec.decode(body, version));
             }
 
             public ChannelBuffer encode(ResultMessage msg)
@@ -237,10 +237,10 @@ public abstract class ResultMessage extends 
Message.Response
     {
         public static final Message.Codec<ResultMessage> subcodec = new 
Message.Codec<ResultMessage>()
         {
-            public ResultMessage decode(ChannelBuffer body)
+            public ResultMessage decode(ChannelBuffer body, int version)
             {
                 MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body));
-                return new Prepared(id, -1, 
ResultSet.Metadata.codec.decode(body));
+                return new Prepared(id, -1, 
ResultSet.Metadata.codec.decode(body, version));
             }
 
             public ChannelBuffer encode(ResultMessage msg)
@@ -328,7 +328,7 @@ public abstract class ResultMessage extends Message.Response
 
         public static final Message.Codec<ResultMessage> subcodec = new 
Message.Codec<ResultMessage>()
         {
-            public ResultMessage decode(ChannelBuffer body)
+            public ResultMessage decode(ChannelBuffer body, int version)
             {
                 String cStr = CBUtil.readString(body);
                 Change change = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java 
b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 7e32769..66b245b 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -40,7 +40,7 @@ public class StartupMessage extends Message.Request
 
     public static final Message.Codec<StartupMessage> codec = new 
Message.Codec<StartupMessage>()
     {
-        public StartupMessage decode(ChannelBuffer body)
+        public StartupMessage decode(ChannelBuffer body, int version)
         {
             return new StartupMessage(CBUtil.readStringMap(body));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java 
b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
index b031ca8..8f7873d 100644
--- a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
@@ -33,7 +33,7 @@ public class SupportedMessage extends Message.Response
 {
     public static final Message.Codec<SupportedMessage> codec = new 
Message.Codec<SupportedMessage>()
     {
-        public SupportedMessage decode(ChannelBuffer body)
+        public SupportedMessage decode(ChannelBuffer body, int version)
         {
             return new 
SupportedMessage(CBUtil.readStringToStringListMap(body));
         }

Reply via email to