Updated Branches: refs/heads/trunk 5d3b08ef1 -> bf2ee0443
Add binary protocol versioning patch by marcuse, reviewed by pcmanus for CASSANDRA-5436 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf2ee044 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf2ee044 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf2ee044 Branch: refs/heads/trunk Commit: bf2ee04431fa3f8142ad43cbf514b69741002f7e Parents: 5d3b08e Author: Marcus Eriksson <marc...@spotify.com> Authored: Wed Apr 17 13:10:54 2013 -0400 Committer: Marcus Eriksson <marc...@spotify.com> Committed: Wed Apr 24 10:11:09 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 +- src/java/org/apache/cassandra/cql3/ResultSet.java | 6 ++-- .../org/apache/cassandra/transport/CBCodec.java | 2 +- src/java/org/apache/cassandra/transport/Frame.java | 10 +++--- .../org/apache/cassandra/transport/Message.java | 25 ++++++++++++--- .../transport/messages/AuthenticateMessage.java | 2 +- .../transport/messages/CredentialsMessage.java | 2 +- .../cassandra/transport/messages/ErrorMessage.java | 2 +- .../cassandra/transport/messages/EventMessage.java | 2 +- .../transport/messages/ExecuteMessage.java | 2 +- .../transport/messages/OptionsMessage.java | 2 +- .../transport/messages/PrepareMessage.java | 2 +- .../cassandra/transport/messages/QueryMessage.java | 2 +- .../cassandra/transport/messages/ReadyMessage.java | 2 +- .../transport/messages/RegisterMessage.java | 2 +- .../transport/messages/ResultMessage.java | 18 +++++----- .../transport/messages/StartupMessage.java | 2 +- .../transport/messages/SupportedMessage.java | 2 +- 18 files changed, 51 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4fdef2c..10136fe 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -35,7 +35,7 @@ out TreeRequests (CASSANDRA-4932) * Add an official way to disable compactions (CASSANDRA-5074) * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919) - + * Add binary protocol versioning (CASSANDRA-5436) 1.2.5 * remove per-row column name bloom filters (CASSANDRA-5492) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/cql3/ResultSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java index 3ddfdc4..5f0dcf1 100644 --- a/src/java/org/apache/cassandra/cql3/ResultSet.java +++ b/src/java/org/apache/cassandra/cql3/ResultSet.java @@ -175,9 +175,9 @@ public class ResultSet * - rows count (4 bytes) * - rows */ - public ResultSet decode(ChannelBuffer body) + public ResultSet decode(ChannelBuffer body, int version) { - Metadata m = Metadata.codec.decode(body); + Metadata m = Metadata.codec.decode(body, version); int rowCount = body.readInt(); ResultSet rs = new ResultSet(m, new ArrayList<List<ByteBuffer>>(rowCount)); @@ -255,7 +255,7 @@ public class ResultSet private static class Codec implements CBCodec<Metadata> { - public Metadata decode(ChannelBuffer body) + public Metadata decode(ChannelBuffer body, int version) { // flags & column count int iflags = body.readInt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/CBCodec.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBCodec.java b/src/java/org/apache/cassandra/transport/CBCodec.java index 1a6719b..2250816 100644 --- a/src/java/org/apache/cassandra/transport/CBCodec.java +++ b/src/java/org/apache/cassandra/transport/CBCodec.java @@ -21,6 +21,6 @@ import org.jboss.netty.buffer.ChannelBuffer; public interface CBCodec<T> { - public T decode(ChannelBuffer body); + public T decode(ChannelBuffer body, int version); public ChannelBuffer encode(T t); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/Frame.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index be9df1a..014d512 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -75,16 +75,16 @@ public class Frame return new Frame(header, fullFrame, connection); } - public static Frame create(Message.Type type, int streamId, EnumSet<Header.Flag> flags, ChannelBuffer body, Connection connection) + public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ChannelBuffer body, Connection connection) { - Header header = new Header(Header.CURRENT_VERSION, flags, streamId, type); + Header header = new Header(version, flags, streamId, type); return new Frame(header, body, connection); } public static class Header { public static final int LENGTH = 8; - public static final int CURRENT_VERSION = 1; + public static final int CURRENT_VERSION = 2; public final int version; public final EnumSet<Flag> flags; @@ -170,8 +170,8 @@ public class Frame int firstByte = buffer.getByte(0); Message.Direction direction = Message.Direction.extractFromVersion(firstByte); int version = firstByte & 0x7F; - // We really only support the current version so far - if (version != Header.CURRENT_VERSION) + + if (version > Header.CURRENT_VERSION) throw new ProtocolException("Invalid or unsupported protocol version: " + version); // Validate the opcode http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 5cc3998..3121ce9 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -115,6 +115,7 @@ public abstract class Message public final Type type; protected volatile Connection connection; private volatile int streamId; + private volatile int version = Frame.Header.CURRENT_VERSION; protected Message(Type type) { @@ -142,6 +143,17 @@ public abstract class Message return streamId; } + public int getVersion() + { + return version; + } + + public Message setVersion(int version) + { + this.version = version; + return this; + } + public abstract ChannelBuffer encode(); public static abstract class Request extends Message @@ -207,8 +219,9 @@ public abstract class Message try { - Message message = frame.header.type.codec.decode(frame.body); + Message message = frame.header.type.codec.decode(frame.body, frame.header.version); message.setStreamId(frame.header.streamId); + message.setVersion(frame.header.version); if (isRequest) { @@ -260,7 +273,8 @@ public abstract class Message if (((Request)message).isTracingRequested()) flags.add(Frame.Header.Flag.TRACING); } - return Frame.create(message.type, message.getStreamId(), flags, body, message.connection()); + + return Frame.create(message.type, message.getStreamId(), message.getVersion(), flags, body, message.connection()); } } @@ -282,21 +296,22 @@ public abstract class Message ServerConnection connection = (ServerConnection)request.connection(); connection.validateNewMessage(request.type); - logger.debug("Received: {}", request); + logger.debug("Received: {}, v={}", request, request.getVersion()); Response response = request.execute(connection.getQueryState(request.getStreamId())); response.setStreamId(request.getStreamId()); + response.setVersion(request.getVersion()); response.attach(connection); connection.applyStateTransition(request.type, response.type); - logger.debug("Responding: {}", response); + logger.debug("Responding: {}, v={}", response, response.getVersion()); ctx.getChannel().write(response); } catch (Exception ex) { // Don't let the exception propagate to exceptionCaught() if we can help it so that we can assign the right streamID. - ctx.getChannel().write(ErrorMessage.fromException(ex).setStreamId(request.getStreamId())); + ctx.getChannel().write(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()).setVersion(request.getVersion())); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java index ce436ea..d781f68 100644 --- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java @@ -29,7 +29,7 @@ public class AuthenticateMessage extends Message.Response { public static final Message.Codec<AuthenticateMessage> codec = new Message.Codec<AuthenticateMessage>() { - public AuthenticateMessage decode(ChannelBuffer body) + public AuthenticateMessage decode(ChannelBuffer body, int version) { String authenticator = CBUtil.readString(body); return new AuthenticateMessage(authenticator); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java index db82844..512675e 100644 --- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java @@ -35,7 +35,7 @@ public class CredentialsMessage extends Message.Request { public static final Message.Codec<CredentialsMessage> codec = new Message.Codec<CredentialsMessage>() { - public CredentialsMessage decode(ChannelBuffer body) + public CredentialsMessage decode(ChannelBuffer body, int version) { CredentialsMessage msg = new CredentialsMessage(); int count = body.readUnsignedShort(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index dd685fa..3243bce 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -43,7 +43,7 @@ public class ErrorMessage extends Message.Response public static final Message.Codec<ErrorMessage> codec = new Message.Codec<ErrorMessage>() { - public ErrorMessage decode(ChannelBuffer body) + public ErrorMessage decode(ChannelBuffer body, int version) { ExceptionCode code = ExceptionCode.fromValue(body.readInt()); String msg = CBUtil.readString(body); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/EventMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java b/src/java/org/apache/cassandra/transport/messages/EventMessage.java index d8fd6f0..7d67de9 100644 --- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java @@ -26,7 +26,7 @@ public class EventMessage extends Message.Response { public static final Message.Codec<EventMessage> codec = new Message.Codec<EventMessage>() { - public EventMessage decode(ChannelBuffer body) + public EventMessage decode(ChannelBuffer body, int version) { return new EventMessage(Event.deserialize(body)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index 2752d2b..62fa10a 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -39,7 +39,7 @@ public class ExecuteMessage extends Message.Request { public static final Message.Codec<ExecuteMessage> codec = new Message.Codec<ExecuteMessage>() { - public ExecuteMessage decode(ChannelBuffer body) + public ExecuteMessage decode(ChannelBuffer body, int version) { byte[] id = CBUtil.readBytes(body); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java index 30bd046..6e753d3 100644 --- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java @@ -37,7 +37,7 @@ public class OptionsMessage extends Message.Request { public static final Message.Codec<OptionsMessage> codec = new Message.Codec<OptionsMessage>() { - public OptionsMessage decode(ChannelBuffer body) + public OptionsMessage decode(ChannelBuffer body, int version) { return new OptionsMessage(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java index 6705474..13f6321 100644 --- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java @@ -32,7 +32,7 @@ public class PrepareMessage extends Message.Request { public static final Message.Codec<PrepareMessage> codec = new Message.Codec<PrepareMessage>() { - public PrepareMessage decode(ChannelBuffer body) + public PrepareMessage decode(ChannelBuffer body, int version) { String query = CBUtil.readLongString(body); return new PrepareMessage(query); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 8d83e47..69c6529 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -38,7 +38,7 @@ public class QueryMessage extends Message.Request { public static final Message.Codec<QueryMessage> codec = new Message.Codec<QueryMessage>() { - public QueryMessage decode(ChannelBuffer body) + public QueryMessage decode(ChannelBuffer body, int version) { String query = CBUtil.readLongString(body); ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java index fa92619..63899e1 100644 --- a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java @@ -29,7 +29,7 @@ public class ReadyMessage extends Message.Response { public static final Message.Codec<ReadyMessage> codec = new Message.Codec<ReadyMessage>() { - public ReadyMessage decode(ChannelBuffer body) + public ReadyMessage decode(ChannelBuffer body, int version) { return new ReadyMessage(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java index 61e11af..9969e3a 100644 --- a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java @@ -30,7 +30,7 @@ public class RegisterMessage extends Message.Request { public static final Message.Codec<RegisterMessage> codec = new Message.Codec<RegisterMessage>() { - public RegisterMessage decode(ChannelBuffer body) + public RegisterMessage decode(ChannelBuffer body, int version) { int length = body.readUnsignedShort(); List<Event.Type> eventTypes = new ArrayList<Event.Type>(length); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/ResultMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java index 739738c..fcff0a8 100644 --- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java @@ -34,10 +34,10 @@ public abstract class ResultMessage extends Message.Response { public static final Message.Codec<ResultMessage> codec = new Message.Codec<ResultMessage>() { - public ResultMessage decode(ChannelBuffer body) + public ResultMessage decode(ChannelBuffer body, int version) { Kind kind = Kind.fromId(body.readInt()); - return kind.subcodec.decode(body); + return kind.subcodec.decode(body, version); } public ChannelBuffer encode(ResultMessage msg) @@ -119,7 +119,7 @@ public abstract class ResultMessage extends Message.Response public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>() { - public ResultMessage decode(ChannelBuffer body) + public ResultMessage decode(ChannelBuffer body, int version) { return new Void(); } @@ -160,7 +160,7 @@ public abstract class ResultMessage extends Message.Response public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>() { - public ResultMessage decode(ChannelBuffer body) + public ResultMessage decode(ChannelBuffer body, int version) { String keyspace = CBUtil.readString(body); return new SetKeyspace(keyspace); @@ -194,9 +194,9 @@ public abstract class ResultMessage extends Message.Response { public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>() { - public ResultMessage decode(ChannelBuffer body) + public ResultMessage decode(ChannelBuffer body, int version) { - return new Rows(ResultSet.codec.decode(body)); + return new Rows(ResultSet.codec.decode(body, version)); } public ChannelBuffer encode(ResultMessage msg) @@ -237,10 +237,10 @@ public abstract class ResultMessage extends Message.Response { public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>() { - public ResultMessage decode(ChannelBuffer body) + public ResultMessage decode(ChannelBuffer body, int version) { MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body)); - return new Prepared(id, -1, ResultSet.Metadata.codec.decode(body)); + return new Prepared(id, -1, ResultSet.Metadata.codec.decode(body, version)); } public ChannelBuffer encode(ResultMessage msg) @@ -328,7 +328,7 @@ public abstract class ResultMessage extends Message.Response public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>() { - public ResultMessage decode(ChannelBuffer body) + public ResultMessage decode(ChannelBuffer body, int version) { String cStr = CBUtil.readString(body); Change change = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/StartupMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java index 7e32769..66b245b 100644 --- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java @@ -40,7 +40,7 @@ public class StartupMessage extends Message.Request public static final Message.Codec<StartupMessage> codec = new Message.Codec<StartupMessage>() { - public StartupMessage decode(ChannelBuffer body) + public StartupMessage decode(ChannelBuffer body, int version) { return new StartupMessage(CBUtil.readStringMap(body)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf2ee044/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java index b031ca8..8f7873d 100644 --- a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java @@ -33,7 +33,7 @@ public class SupportedMessage extends Message.Response { public static final Message.Codec<SupportedMessage> codec = new Message.Codec<SupportedMessage>() { - public SupportedMessage decode(ChannelBuffer body) + public SupportedMessage decode(ChannelBuffer body, int version) { return new SupportedMessage(CBUtil.readStringToStringListMap(body)); }