Updated Branches: refs/heads/trunk f1711794c -> f199fa39b
Adds events push to binary protocol patch by slebresne; reviewed by thepaul for CASSANDRA-4480 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f199fa39 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f199fa39 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f199fa39 Branch: refs/heads/trunk Commit: f199fa39bb7a536a203a5154de24ae67c137ec23 Parents: f171179 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Aug 8 17:12:20 2012 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Sep 7 11:02:03 2012 +0200 ---------------------------------------------------------------------- doc/native_protocol.spec | 59 ++++++- .../org/apache/cassandra/transport/CBUtil.java | 29 +++ .../org/apache/cassandra/transport/Client.java | 13 ++ .../org/apache/cassandra/transport/Connection.java | 97 ++-------- src/java/org/apache/cassandra/transport/Event.java | 146 +++++++++++++++ src/java/org/apache/cassandra/transport/Frame.java | 6 +- .../org/apache/cassandra/transport/Message.java | 9 +- .../org/apache/cassandra/transport/Server.java | 74 +++++++- .../cassandra/transport/ServerConnection.java | 93 +++++++++ .../apache/cassandra/transport/SimpleClient.java | 16 +-- .../transport/messages/CredentialsMessage.java | 3 +- .../cassandra/transport/messages/EventMessage.java | 60 ++++++ .../transport/messages/ExecuteMessage.java | 5 +- .../transport/messages/PrepareMessage.java | 2 +- .../cassandra/transport/messages/QueryMessage.java | 2 +- .../transport/messages/RegisterMessage.java | 80 ++++++++ .../transport/messages/StartupMessage.java | 9 +- 17 files changed, 588 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/doc/native_protocol.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec index 1de7a95..07d5c69 100644 --- a/doc/native_protocol.spec +++ b/doc/native_protocol.spec @@ -8,8 +8,9 @@ Table of Contents 2. Frame header 2.1. version 2.2. flags - 2.3. opcode - 2.4. length + 2.3. stream + 2.4. opcode + 2.5. length 3. Notations 4. Messages 4.1. Requests @@ -19,6 +20,7 @@ Table of Contents 4.1.4. QUERY 4.1.5. PREPARE 4.1.6. EXECUTE + 4.1.7. REGISTER 4.2. Responses 4.2.1. ERROR 4.2.2. READY @@ -29,6 +31,7 @@ Table of Contents 4.2.5.2. Rows 4.2.5.3. Set_keyspace 4.2.5.4. Prepared + 4.2.6. EVENT 5. Compression 6. Error codes @@ -100,9 +103,10 @@ Table of Contents A frame has a stream id (one signed byte). When sending request messages, this stream id must be set by the client to a positive byte (negative stream id - are reserved for future stream initiated by the server). If a client sends a - request message with the stream id X, it is guaranteed that the stream id of - the response to that message will be X. + are reserved for streams initiated by the server; currently all EVENT messages + (section 4.2.6) have a streamId of -1). If a client sends a request message + with the stream id X, it is guaranteed that the stream id of the response to + that message will be X. This allow to deal with the asynchronous nature of the protocol. If a client sends multiple messages simultaneously (without waiting for responses), there @@ -131,11 +135,13 @@ Table of Contents 0x08 RESULT 0x09 PREPARE 0x0A EXECUTE + 0x0B REGISTER + 0x0C EVENT Messages are described in Section 4. -2.4. length +2.5. length A 4 byte integer representing the length of the body of the frame (note: currently a frame is limited to 256MB in length). @@ -160,6 +166,11 @@ Table of Contents of size 0). The supported id (and the corresponding <value>) will be described when this is used. [option list] A [short] n, followed by n [option]. + [inet] An address (ip and port) to a node. It consists of one + [byte] n, that represents the address size, followed by n + [byte] repesenting the IP address (in practice n can only be + either 4 (IPv4) or 16 (IPv6)), following by one [int] + representing the port. [string map] A [short] n, followed by n pair <k><v> where <k> and <v> are [string]. @@ -247,6 +258,21 @@ Table of Contents The response from the server will be a RESULT message. +4.1.7. REGISTER + + Register this connection to receive some type of events. The body of the + message is a [string list] representing the event types to register to. See + section 4.2.6 for the list of valid event types. + + The response to a REGISTER message will be a READY message. + + Please note that if a client driver maintains multiple connections to a + Cassandra node and/or connections to multiple nodes, it is advised to + dedicate a handful of connections to receive events, but to *not* register + for events on all connections, as this would only result in receiving + multiple times the same event messages, wasting bandwidth. + + 4.2. Responses This section describes the content of the frame body for the different @@ -388,6 +414,27 @@ Table of Contents - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2). +4.2.6. EVENT + + And event pushed by the server. A client will only receive events for the + type it has REGISTER to. The body of an EVENT message will start by a + [string] representing the event type. The rest of the message depends on the + event type. The valid event types are: + - "TOPOLOGY_CHANGE": events related to change in the cluster topology. + Currently, events are sent when new nodes are added to the cluster, and + when nodes are removed. The body of the message (after the event type) + consists of a [string] and an [inet], corresponding respectively to the + type of change ("NEW_NODE" or "REMOVED_NODE") followed by the address of + the new/removed node. + - "STATUS_CHANGE": events related to change of node status. Currently, + up/down events are sent. The body of the message (after the event type) + consists of a [string] and an [inet], corresponding respectively to the + type of status change ("UP" or "DOWN") followed by the address of the + concerned node. + + All EVENT message have a streamId of -1 (Section 2.3). + + 5. Compression Frame compression is supported by the protocol, but then only the frame body http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/CBUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java index 44ae64d..b977f35 100644 --- a/src/java/org/apache/cassandra/transport/CBUtil.java +++ b/src/java/org/apache/cassandra/transport/CBUtil.java @@ -17,6 +17,9 @@ */ package org.apache.cassandra.transport; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.ArrayList; @@ -182,6 +185,32 @@ public abstract class CBUtil return length < 0 ? null : cb.readSlice(length).toByteBuffer(); } + public static InetSocketAddress readInet(ChannelBuffer cb) + { + int addrSize = cb.readByte(); + byte[] address = new byte[addrSize]; + cb.readBytes(address); + int port = cb.readInt(); + try + { + return new InetSocketAddress(InetAddress.getByAddress(address), port); + } + catch (UnknownHostException e) + { + throw new ProtocolException(String.format("Invalid IP address (%d.%d.%d.%d) while deserializing inet address", address[0], address[1], address[2], address[3])); + } + } + + public static ChannelBuffer inetToCB(InetSocketAddress inet) + { + byte[] address = inet.getAddress().getAddress(); + ChannelBuffer cb = ChannelBuffers.buffer(1 + address.length + 4); + cb.writeByte(address.length); + cb.writeBytes(address); + cb.writeInt(inet.getPort()); + return cb; + } + public static class BufferBuilder { private final int size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/Client.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java index 2cf815f..b9e00fa 100644 --- a/src/java/org/apache/cassandra/transport/Client.java +++ b/src/java/org/apache/cassandra/transport/Client.java @@ -150,6 +150,19 @@ public class Client extends SimpleClient } return msg; } + else if (msgType.equals("REGISTER")) + { + String type = line.substring(9).toUpperCase(); + try + { + return new RegisterMessage(Collections.singletonList(Enum.valueOf(Event.Type.class, type))); + } + catch (IllegalArgumentException e) + { + System.err.println("[ERROR] Unknown event type: " + type); + return null; + } + } return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/Connection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java index 9f874a3..67f06a4 100644 --- a/src/java/org/apache/cassandra/transport/Connection.java +++ b/src/java/org/apache/cassandra/transport/Connection.java @@ -19,19 +19,16 @@ package org.apache.cassandra.transport; import org.jboss.netty.channel.Channel; -import org.apache.cassandra.service.ClientState; - -public abstract class Connection +public class Connection { - public static final Factory SERVER_FACTORY = new Factory() - { - public Connection newConnection() - { - return new ServerConnection(); - } - }; + private volatile FrameCompressor frameCompressor; + private volatile Channel channel; + private final Tracker tracker; - private FrameCompressor frameCompressor; + public Connection(Tracker tracker) + { + this.tracker = tracker; + } public void setCompressor(FrameCompressor compressor) { @@ -43,77 +40,25 @@ public abstract class Connection return frameCompressor; } - public abstract void validateNewMessage(Message.Type type); - public abstract void applyStateTransition(Message.Type requestType, Message.Type responseType); - public abstract ClientState clientState(); - - public interface Factory + public Tracker getTracker() { - public Connection newConnection(); + return tracker; } - private static class ServerConnection extends Connection + public void registerChannel(Channel ch) { - private enum State { UNINITIALIZED, AUTHENTICATION, READY; } - - private final ClientState clientState; - private State state; - - public ServerConnection() - { - this.clientState = new ClientState(); - this.state = State.UNINITIALIZED; - } - - public ClientState clientState() - { - return clientState; - } + channel = ch; + tracker.addConnection(ch, this); + } - public void validateNewMessage(Message.Type type) - { - switch (state) - { - case UNINITIALIZED: - if (type != Message.Type.STARTUP && type != Message.Type.OPTIONS) - throw new ProtocolException(String.format("Unexpected message %s, expecting STARTUP or OPTIONS", type)); - break; - case AUTHENTICATION: - if (type != Message.Type.CREDENTIALS) - throw new ProtocolException(String.format("Unexpected message %s, needs authentication through CREDENTIALS message", type)); - break; - case READY: - if (type == Message.Type.STARTUP) - throw new ProtocolException("Unexpected message STARTUP, the connection is already initialized"); - break; - default: - throw new AssertionError(); - } - } + public Channel channel() + { + return channel; + } - public void applyStateTransition(Message.Type requestType, Message.Type responseType) - { - switch (state) - { - case UNINITIALIZED: - if (requestType == Message.Type.STARTUP) - { - if (responseType == Message.Type.AUTHENTICATE) - state = State.AUTHENTICATION; - else if (responseType == Message.Type.READY) - state = State.READY; - } - break; - case AUTHENTICATION: - assert requestType == Message.Type.CREDENTIALS; - if (responseType == Message.Type.READY) - state = State.READY; - case READY: - break; - default: - throw new AssertionError(); - } - } + public interface Factory + { + public Connection newConnection(Tracker tracker); } public interface Tracker http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/Event.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java new file mode 100644 index 0000000..849caff --- /dev/null +++ b/src/java/org/apache/cassandra/transport/Event.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.transport; + +import java.net.InetAddress; +import java.net.InetSocketAddress; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +public abstract class Event +{ + public enum Type { TOPOLOGY_CHANGE, STATUS_CHANGE } + + public final Type type; + + private Event(Type type) + { + this.type = type; + } + + public static Event deserialize(ChannelBuffer cb) + { + switch (Enum.valueOf(Type.class, CBUtil.readString(cb).toUpperCase())) + { + case TOPOLOGY_CHANGE: + return TopologyChange.deserializeEvent(cb); + case STATUS_CHANGE: + return StatusChange.deserializeEvent(cb); + } + throw new AssertionError(); + } + + public ChannelBuffer serialize() + { + return ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(type.toString()), + serializeEvent()); + } + + protected abstract ChannelBuffer serializeEvent(); + + public static class TopologyChange extends Event + { + public enum Change { NEW_NODE, REMOVED_NODE } + + public final Change change; + public final InetSocketAddress node; + + private TopologyChange(Change change, InetSocketAddress node) + { + super(Type.TOPOLOGY_CHANGE); + this.change = change; + this.node = node; + } + + public static TopologyChange newNode(InetAddress host, int port) + { + return new TopologyChange(Change.NEW_NODE, new InetSocketAddress(host, port)); + } + + public static TopologyChange removedNode(InetAddress host, int port) + { + return new TopologyChange(Change.REMOVED_NODE, new InetSocketAddress(host, port)); + } + + // Assumes the type has already by been deserialized + private static TopologyChange deserializeEvent(ChannelBuffer cb) + { + Change change = Enum.valueOf(Change.class, CBUtil.readString(cb).toUpperCase()); + InetSocketAddress node = CBUtil.readInet(cb); + return new TopologyChange(change, node); + } + + protected ChannelBuffer serializeEvent() + { + return ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(change.toString()), + CBUtil.inetToCB(node)); + } + + @Override + public String toString() + { + return change + " " + node; + } + } + + public static class StatusChange extends Event + { + public enum Status { UP, DOWN } + + public final Status status; + public final InetSocketAddress node; + + private StatusChange(Status status, InetSocketAddress node) + { + super(Type.STATUS_CHANGE); + this.status = status; + this.node = node; + } + + public static StatusChange nodeUp(InetAddress host, int port) + { + return new StatusChange(Status.UP, new InetSocketAddress(host, port)); + } + + public static StatusChange nodeDown(InetAddress host, int port) + { + return new StatusChange(Status.DOWN, new InetSocketAddress(host, port)); + } + + // Assumes the type has already by been deserialized + private static StatusChange deserializeEvent(ChannelBuffer cb) + { + Status status = Enum.valueOf(Status.class, CBUtil.readString(cb).toUpperCase()); + InetSocketAddress node = CBUtil.readInet(cb); + return new StatusChange(status, node); + } + + protected ChannelBuffer serializeEvent() + { + return ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(status.toString()), + CBUtil.inetToCB(node)); + } + + @Override + public String toString() + { + return status + " " + node; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/Frame.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index 3f85b1a..b164998 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -140,21 +140,19 @@ public class Frame public static class Decoder extends LengthFieldBasedFrameDecoder { private static final int MAX_FRAME_LENTH = 256 * 1024 * 1024; // 256 MB - private final Connection.Tracker tracker; private final Connection connection; public Decoder(Connection.Tracker tracker, Connection.Factory factory) { super(MAX_FRAME_LENTH, 4, 4); - this.tracker = tracker; - this.connection = factory.newConnection(); + this.connection = factory.newConnection(tracker); } @Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - tracker.addConnection(e.getChannel(), connection); + connection.registerChannel(e.getChannel()); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 85de3e4..81df682 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -62,7 +62,9 @@ public abstract class Message QUERY (7, Direction.REQUEST, QueryMessage.codec), RESULT (8, Direction.RESPONSE, ResultMessage.codec), PREPARE (9, Direction.REQUEST, PrepareMessage.codec), - EXECUTE (10, Direction.REQUEST, ExecuteMessage.codec); + EXECUTE (10, Direction.REQUEST, ExecuteMessage.codec), + REGISTER (11, Direction.REQUEST, RegisterMessage.codec), + EVENT (12, Direction.RESPONSE, EventMessage.codec); public final int opcode; public final Direction direction; @@ -201,7 +203,8 @@ public abstract class Message try { - Connection connection = request.connection(); + assert request.connection() instanceof ServerConnection; + ServerConnection connection = (ServerConnection)request.connection(); connection.validateNewMessage(request.type); logger.debug("Received: " + request); @@ -209,7 +212,7 @@ public abstract class Message Response response = request.execute(); response.setStreamId(request.getStreamId()); response.attach(connection); - response.connection().applyStateTransition(request.type, response.type); + connection.applyStateTransition(request.type, response.type); logger.debug("Responding: " + response); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index fda38cf..74885b7 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -19,6 +19,7 @@ package org.apache.cassandra.transport; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.EnumMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,7 +34,9 @@ import org.jboss.netty.logging.Slf4JLoggerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.gms.*; import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.transport.messages.EventMessage; public class Server implements CassandraDaemon.Server { @@ -55,6 +58,7 @@ public class Server implements CassandraDaemon.Server public Server(InetSocketAddress socket) { this.socket = socket; + Gossiper.instance.register(new EventNotifier(this)); } public Server(String hostname, int port) @@ -115,15 +119,36 @@ public class Server implements CassandraDaemon.Server executionHandler = null; } - private static class ConnectionTracker implements Connection.Tracker + public static class ConnectionTracker implements Connection.Tracker { - public final ChannelLocal<Connection> openedConnections = new ChannelLocal<Connection>(true); public final ChannelGroup allChannels = new DefaultChannelGroup(); + private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<Event.Type, ChannelGroup>(Event.Type.class); + + public ConnectionTracker() + { + for (Event.Type type : Event.Type.values()) + groups.put(type, new DefaultChannelGroup(type.toString())); + } public void addConnection(Channel ch, Connection connection) { allChannels.add(ch); - openedConnections.set(ch, connection); + } + + public void register(Event.Type type, Channel ch) + { + groups.get(type).add(ch); + } + + public void unregister(Channel ch) + { + for (ChannelGroup group : groups.values()) + group.remove(ch); + } + + public void send(Event event) + { + groups.get(event.type).write(new EventMessage(event)); } public void closeAll() @@ -155,7 +180,7 @@ public class Server implements CassandraDaemon.Server //pipeline.addLast("debug", new LoggingHandler()); - pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionTracker, Connection.SERVER_FACTORY)); + pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionTracker, ServerConnection.FACTORY)); pipeline.addLast("frameEncoder", frameEncoder); pipeline.addLast("frameDecompressor", frameDecompressor); @@ -171,4 +196,45 @@ public class Server implements CassandraDaemon.Server return pipeline; } } + + private static class EventNotifier implements IEndpointStateChangeSubscriber + { + private final Server server; + + private EventNotifier(Server server) + { + this.server = server; + } + + public void onJoin(InetAddress endpoint, EndpointState epState) + { + // TODO: we don't gossip the native protocol ip/port yet, so use the + // endpoint address and ip on which this server is listening instead. + server.connectionTracker.send(Event.TopologyChange.newNode(endpoint, server.socket.getPort())); + } + + public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) + { + } + + public void onAlive(InetAddress endpoint, EndpointState state) + { + server.connectionTracker.send(Event.StatusChange.nodeUp(endpoint, server.socket.getPort())); + } + + public void onDead(InetAddress endpoint, EndpointState state) + { + server.connectionTracker.send(Event.StatusChange.nodeDown(endpoint, server.socket.getPort())); + } + + public void onRemove(InetAddress endpoint) + { + server.connectionTracker.send(Event.TopologyChange.removedNode(endpoint, server.socket.getPort())); + } + + public void onRestart(InetAddress endpoint, EndpointState state) + { + server.connectionTracker.send(Event.StatusChange.nodeUp(endpoint, server.socket.getPort())); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/ServerConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java new file mode 100644 index 0000000..561828d --- /dev/null +++ b/src/java/org/apache/cassandra/transport/ServerConnection.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.transport; + +import org.apache.cassandra.service.ClientState; + +public class ServerConnection extends Connection +{ + public static final Factory FACTORY = new Factory() + { + public Connection newConnection(Connection.Tracker tracker) + { + return new ServerConnection(tracker); + } + }; + + private enum State { UNINITIALIZED, AUTHENTICATION, READY; } + + private final ClientState clientState; + private volatile State state; + + public ServerConnection(Connection.Tracker tracker) + { + super(tracker); + this.clientState = new ClientState(); + this.state = State.UNINITIALIZED; + } + + public ClientState clientState() + { + return clientState; + } + + public void validateNewMessage(Message.Type type) + { + switch (state) + { + case UNINITIALIZED: + if (type != Message.Type.STARTUP && type != Message.Type.OPTIONS) + throw new ProtocolException(String.format("Unexpected message %s, expecting STARTUP or OPTIONS", type)); + break; + case AUTHENTICATION: + if (type != Message.Type.CREDENTIALS) + throw new ProtocolException(String.format("Unexpected message %s, needs authentication through CREDENTIALS message", type)); + break; + case READY: + if (type == Message.Type.STARTUP) + throw new ProtocolException("Unexpected message STARTUP, the connection is already initialized"); + break; + default: + throw new AssertionError(); + } + } + + public void applyStateTransition(Message.Type requestType, Message.Type responseType) + { + switch (state) + { + case UNINITIALIZED: + if (requestType == Message.Type.STARTUP) + { + if (responseType == Message.Type.AUTHENTICATE) + state = State.AUTHENTICATION; + else if (responseType == Message.Type.READY) + state = State.READY; + } + break; + case AUTHENTICATION: + assert requestType == Message.Type.CREDENTIALS; + if (responseType == Message.Type.READY) + state = State.READY; + case READY: + break; + default: + throw new AssertionError(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index 66cf5ae..ea0a3df 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -50,15 +50,15 @@ public class SimpleClient public final int port; protected final ResponseHandler responseHandler = new ResponseHandler(); - protected final ClientConnection connection = new ClientConnection(); protected final Connection.Tracker tracker = new ConnectionTracker(); + protected final Connection connection = new Connection(tracker); protected ClientBootstrap bootstrap; protected Channel channel; protected ChannelFuture lastWriteFuture; private final Connection.Factory connectionFactory = new Connection.Factory() { - public Connection newConnection() + public Connection newConnection(Connection.Tracker tracker) { return connection; } @@ -164,18 +164,6 @@ public class SimpleClient } } - protected static class ClientConnection extends Connection - { - public ClientState clientState() - { - return null; - } - - public void validateNewMessage(Message.Type type) {} - - public void applyStateTransition(Message.Type requestType, Message.Type responseType) {} - } - // Stateless handlers private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder(); private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java index e318f8e..c103c93 100644 --- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java @@ -25,6 +25,7 @@ import org.jboss.netty.buffer.ChannelBuffers; import org.apache.cassandra.transport.CBUtil; import org.apache.cassandra.transport.Message; +import org.apache.cassandra.transport.ServerConnection; import org.apache.cassandra.thrift.AuthenticationException; /** @@ -77,7 +78,7 @@ public class CredentialsMessage extends Message.Request { try { - connection.clientState().login(credentials); + ((ServerConnection)connection).clientState().login(credentials); return new ReadyMessage(); } catch (AuthenticationException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/EventMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java b/src/java/org/apache/cassandra/transport/messages/EventMessage.java new file mode 100644 index 0000000..0028395 --- /dev/null +++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.transport.messages; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import org.apache.cassandra.transport.Event; +import org.apache.cassandra.transport.Message; + +public class EventMessage extends Message.Response +{ + public static final Message.Codec<EventMessage> codec = new Message.Codec<EventMessage>() + { + public EventMessage decode(ChannelBuffer body) + { + return new EventMessage(Event.deserialize(body)); + } + + public ChannelBuffer encode(EventMessage msg) + { + return msg.event.serialize(); + } + }; + + public final Event event; + + public EventMessage(Event event) + { + super(Message.Type.EVENT); + this.event = event; + this.setStreamId(-1); + } + + public ChannelBuffer encode() + { + return codec.encode(this); + } + + @Override + public String toString() + { + return "EVENT " + event; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index 9d2d0e4..4172862 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -83,12 +83,13 @@ public class ExecuteMessage extends Message.Request { try { - CQLStatement statement = connection.clientState().getCQL3Prepared().get(statementId); + ServerConnection c = (ServerConnection)connection; + CQLStatement statement = c.clientState().getCQL3Prepared().get(statementId); if (statement == null) throw new InvalidRequestException(String.format("Prepared query with ID %d not found", statementId)); - return QueryProcessor.processPrepared(statement, connection.clientState(), values); + return QueryProcessor.processPrepared(statement, c.clientState(), values); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java index 4141763..5c2636a 100644 --- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java @@ -55,7 +55,7 @@ public class PrepareMessage extends Message.Request { try { - return QueryProcessor.prepare(query, connection.clientState()); + return QueryProcessor.prepare(query, ((ServerConnection)connection).clientState()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 399fe95..c291e1d 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -61,7 +61,7 @@ public class QueryMessage extends Message.Request { try { - return QueryProcessor.process(query, connection.clientState()); + return QueryProcessor.process(query, ((ServerConnection)connection).clientState()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java new file mode 100644 index 0000000..656e03c --- /dev/null +++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.transport.messages; + +import java.util.ArrayList; +import java.util.List; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import org.apache.cassandra.transport.*; + +public class RegisterMessage extends Message.Request +{ + public static final Message.Codec<RegisterMessage> codec = new Message.Codec<RegisterMessage>() + { + public RegisterMessage decode(ChannelBuffer body) + { + List<String> l = CBUtil.readStringList(body); + List<Event.Type> eventTypes = new ArrayList<Event.Type>(l.size()); + for (String s : l) + eventTypes.add(Enum.valueOf(Event.Type.class, s.toUpperCase())); + return new RegisterMessage(eventTypes); + } + + public ChannelBuffer encode(RegisterMessage msg) + { + List<String> l = new ArrayList<String>(msg.eventTypes.size()); + for (Event.Type type : msg.eventTypes) + l.add(type.toString()); + ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); + CBUtil.writeStringList(cb, l); + return cb; + } + }; + + public final List<Event.Type> eventTypes; + + public RegisterMessage(List<Event.Type> eventTypes) + { + super(Message.Type.REGISTER); + this.eventTypes = eventTypes; + } + + public Response execute() + { + assert connection instanceof ServerConnection; + Connection.Tracker tracker = ((ServerConnection)connection).getTracker(); + assert tracker instanceof Server.ConnectionTracker; + for (Event.Type type : eventTypes) + ((Server.ConnectionTracker)tracker).register(type, connection().channel()); + return new ReadyMessage(); + } + + public ChannelBuffer encode() + { + return codec.encode(this); + } + + @Override + public String toString() + { + return "REGISTER " + eventTypes; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/StartupMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java index 23199f1..24430e0 100644 --- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java @@ -69,12 +69,15 @@ public class StartupMessage extends Message.Request { try { + assert connection instanceof ServerConnection; + ServerConnection c = (ServerConnection)connection; + String cqlVersion = options.get(CQL_VERSION); if (cqlVersion == null) throw new ProtocolException("Missing value CQL_VERSION in STARTUP message"); - connection.clientState().setCQLVersion(cqlVersion); - if (connection.clientState().getCQLVersion().compareTo(new SemanticVersion("2.99.0")) < 0) + c.clientState().setCQLVersion(cqlVersion); + if (c.clientState().getCQLVersion().compareTo(new SemanticVersion("2.99.0")) < 0) throw new ProtocolException(String.format("CQL version %s is not support by the binary protocol (supported version are >= 3.0.0)", cqlVersion)); if (options.containsKey(COMPRESSION)) @@ -92,7 +95,7 @@ public class StartupMessage extends Message.Request } } - if (connection.clientState().isLogged()) + if (c.clientState().isLogged()) return new ReadyMessage(); else return new AuthenticateMessage(DatabaseDescriptor.getAuthenticator().getClass().getName());