Updated Branches:
  refs/heads/trunk f1711794c -> f199fa39b

Adds events push to binary protocol

patch by slebresne; reviewed by thepaul for CASSANDRA-4480


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f199fa39
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f199fa39
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f199fa39

Branch: refs/heads/trunk
Commit: f199fa39bb7a536a203a5154de24ae67c137ec23
Parents: f171179
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Wed Aug 8 17:12:20 2012 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Fri Sep 7 11:02:03 2012 +0200

----------------------------------------------------------------------
 doc/native_protocol.spec                           |   59 ++++++-
 .../org/apache/cassandra/transport/CBUtil.java     |   29 +++
 .../org/apache/cassandra/transport/Client.java     |   13 ++
 .../org/apache/cassandra/transport/Connection.java |   97 ++--------
 src/java/org/apache/cassandra/transport/Event.java |  146 +++++++++++++++
 src/java/org/apache/cassandra/transport/Frame.java |    6 +-
 .../org/apache/cassandra/transport/Message.java    |    9 +-
 .../org/apache/cassandra/transport/Server.java     |   74 +++++++-
 .../cassandra/transport/ServerConnection.java      |   93 +++++++++
 .../apache/cassandra/transport/SimpleClient.java   |   16 +--
 .../transport/messages/CredentialsMessage.java     |    3 +-
 .../cassandra/transport/messages/EventMessage.java |   60 ++++++
 .../transport/messages/ExecuteMessage.java         |    5 +-
 .../transport/messages/PrepareMessage.java         |    2 +-
 .../cassandra/transport/messages/QueryMessage.java |    2 +-
 .../transport/messages/RegisterMessage.java        |   80 ++++++++
 .../transport/messages/StartupMessage.java         |    9 +-
 17 files changed, 588 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/doc/native_protocol.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec
index 1de7a95..07d5c69 100644
--- a/doc/native_protocol.spec
+++ b/doc/native_protocol.spec
@@ -8,8 +8,9 @@ Table of Contents
   2. Frame header
     2.1. version
     2.2. flags
-    2.3. opcode
-    2.4. length
+    2.3. stream
+    2.4. opcode
+    2.5. length
   3. Notations
   4. Messages
     4.1. Requests
@@ -19,6 +20,7 @@ Table of Contents
       4.1.4. QUERY
       4.1.5. PREPARE
       4.1.6. EXECUTE
+      4.1.7. REGISTER
     4.2. Responses
       4.2.1. ERROR
       4.2.2. READY
@@ -29,6 +31,7 @@ Table of Contents
         4.2.5.2. Rows
         4.2.5.3. Set_keyspace
         4.2.5.4. Prepared
+      4.2.6. EVENT
   5. Compression
   6. Error codes
 
@@ -100,9 +103,10 @@ Table of Contents
 
   A frame has a stream id (one signed byte). When sending request messages, 
this
   stream id must be set by the client to a positive byte (negative stream id
-  are reserved for future stream initiated by the server). If a client sends a
-  request message with the stream id X, it is guaranteed that the stream id of
-  the response to that message will be X.
+  are reserved for streams initiated by the server; currently all EVENT 
messages
+  (section 4.2.6) have a streamId of -1). If a client sends a request message
+  with the stream id X, it is guaranteed that the stream id of the response to
+  that message will be X.
 
   This allow to deal with the asynchronous nature of the protocol. If a client
   sends multiple messages simultaneously (without waiting for responses), there
@@ -131,11 +135,13 @@ Table of Contents
     0x08    RESULT
     0x09    PREPARE
     0x0A    EXECUTE
+    0x0B    REGISTER
+    0x0C    EVENT
 
   Messages are described in Section 4.
 
 
-2.4. length
+2.5. length
 
   A 4 byte integer representing the length of the body of the frame (note:
   currently a frame is limited to 256MB in length).
@@ -160,6 +166,11 @@ Table of Contents
                    of size 0). The supported id (and the corresponding <value>)
                    will be described when this is used.
     [option list]  A [short] n, followed by n [option].
+    [inet]         An address (ip and port) to a node. It consists of one
+                   [byte] n, that represents the address size, followed by n
+                   [byte] repesenting the IP address (in practice n can only be
+                   either 4 (IPv4) or 16 (IPv6)), following by one [int]
+                   representing the port.
 
     [string map]      A [short] n, followed by n pair <k><v> where <k> and <v>
                       are [string].
@@ -247,6 +258,21 @@ Table of Contents
   The response from the server will be a RESULT message.
 
 
+4.1.7. REGISTER
+
+  Register this connection to receive some type of events. The body of the
+  message is a [string list] representing the event types to register to. See
+  section 4.2.6 for the list of valid event types.
+
+  The response to a REGISTER message will be a READY message.
+
+  Please note that if a client driver maintains multiple connections to a
+  Cassandra node and/or connections to multiple nodes, it is advised to
+  dedicate a handful of connections to receive events, but to *not* register
+  for events on all connections, as this would only result in receiving
+  multiple times the same event messages, wasting bandwidth.
+
+
 4.2. Responses
 
   This section describes the content of the frame body for the different
@@ -388,6 +414,27 @@ Table of Contents
     - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2).
 
 
+4.2.6. EVENT
+
+  And event pushed by the server. A client will only receive events for the
+  type it has REGISTER to. The body of an EVENT message will start by a
+  [string] representing the event type. The rest of the message depends on the
+  event type. The valid event types are:
+    - "TOPOLOGY_CHANGE": events related to change in the cluster topology.
+      Currently, events are sent when new nodes are added to the cluster, and
+      when nodes are removed. The body of the message (after the event type)
+      consists of a [string] and an [inet], corresponding respectively to the
+      type of change ("NEW_NODE" or "REMOVED_NODE") followed by the address of
+      the new/removed node.
+    - "STATUS_CHANGE": events related to change of node status. Currently,
+      up/down events are sent. The body of the message (after the event type)
+      consists of a [string] and an [inet], corresponding respectively to the
+      type of status change ("UP" or "DOWN") followed by the address of the
+      concerned node.
+
+  All EVENT message have a streamId of -1 (Section 2.3).
+
+
 5. Compression
 
   Frame compression is supported by the protocol, but then only the frame body

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java 
b/src/java/org/apache/cassandra/transport/CBUtil.java
index 44ae64d..b977f35 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.transport;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.ArrayList;
@@ -182,6 +185,32 @@ public abstract class CBUtil
         return length < 0 ? null : cb.readSlice(length).toByteBuffer();
     }
 
+    public static InetSocketAddress readInet(ChannelBuffer cb)
+    {
+        int addrSize = cb.readByte();
+        byte[] address = new byte[addrSize];
+        cb.readBytes(address);
+        int port = cb.readInt();
+        try
+        {
+            return new InetSocketAddress(InetAddress.getByAddress(address), 
port);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new ProtocolException(String.format("Invalid IP address 
(%d.%d.%d.%d) while deserializing inet address", address[0], address[1], 
address[2], address[3]));
+        }
+    }
+
+    public static ChannelBuffer inetToCB(InetSocketAddress inet)
+    {
+        byte[] address = inet.getAddress().getAddress();
+        ChannelBuffer cb = ChannelBuffers.buffer(1 + address.length + 4);
+        cb.writeByte(address.length);
+        cb.writeBytes(address);
+        cb.writeInt(inet.getPort());
+        return cb;
+    }
+
     public static class BufferBuilder
     {
         private final int size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java 
b/src/java/org/apache/cassandra/transport/Client.java
index 2cf815f..b9e00fa 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -150,6 +150,19 @@ public class Client extends SimpleClient
             }
             return msg;
         }
+        else if (msgType.equals("REGISTER"))
+        {
+            String type = line.substring(9).toUpperCase();
+            try
+            {
+                return new 
RegisterMessage(Collections.singletonList(Enum.valueOf(Event.Type.class, 
type)));
+            }
+            catch (IllegalArgumentException e)
+            {
+                System.err.println("[ERROR] Unknown event type: " + type);
+                return null;
+            }
+        }
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/Connection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Connection.java 
b/src/java/org/apache/cassandra/transport/Connection.java
index 9f874a3..67f06a4 100644
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@ -19,19 +19,16 @@ package org.apache.cassandra.transport;
 
 import org.jboss.netty.channel.Channel;
 
-import org.apache.cassandra.service.ClientState;
-
-public abstract class Connection
+public class Connection
 {
-    public static final Factory SERVER_FACTORY = new Factory()
-    {
-        public Connection newConnection()
-        {
-            return new ServerConnection();
-        }
-    };
+    private volatile FrameCompressor frameCompressor;
+    private volatile Channel channel;
+    private final Tracker tracker;
 
-    private FrameCompressor frameCompressor;
+    public Connection(Tracker tracker)
+    {
+        this.tracker = tracker;
+    }
 
     public void setCompressor(FrameCompressor compressor)
     {
@@ -43,77 +40,25 @@ public abstract class Connection
         return frameCompressor;
     }
 
-    public abstract void validateNewMessage(Message.Type type);
-    public abstract void applyStateTransition(Message.Type requestType, 
Message.Type responseType);
-    public abstract ClientState clientState();
-
-    public interface Factory
+    public Tracker getTracker()
     {
-        public Connection newConnection();
+        return tracker;
     }
 
-    private static class ServerConnection extends Connection
+    public void registerChannel(Channel ch)
     {
-        private enum State { UNINITIALIZED, AUTHENTICATION, READY; }
-
-        private final ClientState clientState;
-        private State state;
-
-        public ServerConnection()
-        {
-            this.clientState = new ClientState();
-            this.state = State.UNINITIALIZED;
-        }
-
-        public ClientState clientState()
-        {
-            return clientState;
-        }
+        channel = ch;
+        tracker.addConnection(ch, this);
+    }
 
-        public void validateNewMessage(Message.Type type)
-        {
-            switch (state)
-            {
-                case UNINITIALIZED:
-                    if (type != Message.Type.STARTUP && type != 
Message.Type.OPTIONS)
-                        throw new ProtocolException(String.format("Unexpected 
message %s, expecting STARTUP or OPTIONS", type));
-                    break;
-                case AUTHENTICATION:
-                    if (type != Message.Type.CREDENTIALS)
-                        throw new ProtocolException(String.format("Unexpected 
message %s, needs authentication through CREDENTIALS message", type));
-                    break;
-                case READY:
-                    if (type == Message.Type.STARTUP)
-                        throw new ProtocolException("Unexpected message 
STARTUP, the connection is already initialized");
-                    break;
-                default:
-                    throw new AssertionError();
-            }
-        }
+    public Channel channel()
+    {
+        return channel;
+    }
 
-        public void applyStateTransition(Message.Type requestType, 
Message.Type responseType)
-        {
-            switch (state)
-            {
-                case UNINITIALIZED:
-                    if (requestType == Message.Type.STARTUP)
-                    {
-                        if (responseType == Message.Type.AUTHENTICATE)
-                            state = State.AUTHENTICATION;
-                        else if (responseType == Message.Type.READY)
-                            state = State.READY;
-                    }
-                    break;
-                case AUTHENTICATION:
-                    assert requestType == Message.Type.CREDENTIALS;
-                    if (responseType == Message.Type.READY)
-                        state = State.READY;
-                case READY:
-                    break;
-                default:
-                    throw new AssertionError();
-            }
-        }
+    public interface Factory
+    {
+        public Connection newConnection(Tracker tracker);
     }
 
     public interface Tracker

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java 
b/src/java/org/apache/cassandra/transport/Event.java
new file mode 100644
index 0000000..849caff
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+public abstract class Event
+{
+    public enum Type { TOPOLOGY_CHANGE, STATUS_CHANGE }
+
+    public final Type type;
+
+    private Event(Type type)
+    {
+        this.type = type;
+    }
+
+    public static Event deserialize(ChannelBuffer cb)
+    {
+        switch (Enum.valueOf(Type.class, CBUtil.readString(cb).toUpperCase()))
+        {
+            case TOPOLOGY_CHANGE:
+                return TopologyChange.deserializeEvent(cb);
+            case STATUS_CHANGE:
+                return StatusChange.deserializeEvent(cb);
+        }
+        throw new AssertionError();
+    }
+
+    public ChannelBuffer serialize()
+    {
+        return ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(type.toString()),
+                                            serializeEvent());
+    }
+
+    protected abstract ChannelBuffer serializeEvent();
+
+    public static class TopologyChange extends Event
+    {
+        public enum Change { NEW_NODE, REMOVED_NODE }
+
+        public final Change change;
+        public final InetSocketAddress node;
+
+        private TopologyChange(Change change, InetSocketAddress node)
+        {
+            super(Type.TOPOLOGY_CHANGE);
+            this.change = change;
+            this.node = node;
+        }
+
+        public static TopologyChange newNode(InetAddress host, int port)
+        {
+            return new TopologyChange(Change.NEW_NODE, new 
InetSocketAddress(host, port));
+        }
+
+        public static TopologyChange removedNode(InetAddress host, int port)
+        {
+            return new TopologyChange(Change.REMOVED_NODE, new 
InetSocketAddress(host, port));
+        }
+
+        // Assumes the type has already by been deserialized
+        private static TopologyChange deserializeEvent(ChannelBuffer cb)
+        {
+            Change change = Enum.valueOf(Change.class, 
CBUtil.readString(cb).toUpperCase());
+            InetSocketAddress node = CBUtil.readInet(cb);
+            return new TopologyChange(change, node);
+        }
+
+        protected ChannelBuffer serializeEvent()
+        {
+            return 
ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(change.toString()),
+                                                CBUtil.inetToCB(node));
+        }
+
+        @Override
+        public String toString()
+        {
+            return change + " " + node;
+        }
+    }
+
+    public static class StatusChange extends Event
+    {
+        public enum Status { UP, DOWN }
+
+        public final Status status;
+        public final InetSocketAddress node;
+
+        private StatusChange(Status status, InetSocketAddress node)
+        {
+            super(Type.STATUS_CHANGE);
+            this.status = status;
+            this.node = node;
+        }
+
+        public static StatusChange nodeUp(InetAddress host, int port)
+        {
+            return new StatusChange(Status.UP, new InetSocketAddress(host, 
port));
+        }
+
+        public static StatusChange nodeDown(InetAddress host, int port)
+        {
+            return new StatusChange(Status.DOWN, new InetSocketAddress(host, 
port));
+        }
+
+        // Assumes the type has already by been deserialized
+        private static StatusChange deserializeEvent(ChannelBuffer cb)
+        {
+            Status status = Enum.valueOf(Status.class, 
CBUtil.readString(cb).toUpperCase());
+            InetSocketAddress node = CBUtil.readInet(cb);
+            return new StatusChange(status, node);
+        }
+
+        protected ChannelBuffer serializeEvent()
+        {
+            return 
ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(status.toString()),
+                                                CBUtil.inetToCB(node));
+        }
+
+        @Override
+        public String toString()
+        {
+            return status + " " + node;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java 
b/src/java/org/apache/cassandra/transport/Frame.java
index 3f85b1a..b164998 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -140,21 +140,19 @@ public class Frame
     public static class Decoder extends LengthFieldBasedFrameDecoder
     {
         private static final int MAX_FRAME_LENTH = 256 * 1024 * 1024; // 256 MB
-        private final Connection.Tracker tracker;
         private final Connection connection;
 
         public Decoder(Connection.Tracker tracker, Connection.Factory factory)
         {
             super(MAX_FRAME_LENTH, 4, 4);
-            this.tracker = tracker;
-            this.connection = factory.newConnection();
+            this.connection = factory.newConnection(tracker);
         }
 
         @Override
         public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
         throws Exception
         {
-            tracker.addConnection(e.getChannel(), connection);
+            connection.registerChannel(e.getChannel());
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index 85de3e4..81df682 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -62,7 +62,9 @@ public abstract class Message
         QUERY        (7,  Direction.REQUEST,  QueryMessage.codec),
         RESULT       (8,  Direction.RESPONSE, ResultMessage.codec),
         PREPARE      (9,  Direction.REQUEST,  PrepareMessage.codec),
-        EXECUTE      (10, Direction.REQUEST,  ExecuteMessage.codec);
+        EXECUTE      (10, Direction.REQUEST,  ExecuteMessage.codec),
+        REGISTER     (11, Direction.REQUEST,  RegisterMessage.codec),
+        EVENT        (12, Direction.RESPONSE, EventMessage.codec);
 
         public final int opcode;
         public final Direction direction;
@@ -201,7 +203,8 @@ public abstract class Message
 
             try
             {
-                Connection connection = request.connection();
+                assert request.connection() instanceof ServerConnection;
+                ServerConnection connection = 
(ServerConnection)request.connection();
                 connection.validateNewMessage(request.type);
 
                 logger.debug("Received: " + request);
@@ -209,7 +212,7 @@ public abstract class Message
                 Response response = request.execute();
                 response.setStreamId(request.getStreamId());
                 response.attach(connection);
-                response.connection().applyStateTransition(request.type, 
response.type);
+                connection.applyStateTransition(request.type, response.type);
 
                 logger.debug("Responding: " + response);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java 
b/src/java/org/apache/cassandra/transport/Server.java
index fda38cf..74885b7 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.transport;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.EnumMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -33,7 +34,9 @@ import org.jboss.netty.logging.Slf4JLoggerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.gms.*;
 import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.transport.messages.EventMessage;
 
 public class Server implements CassandraDaemon.Server
 {
@@ -55,6 +58,7 @@ public class Server implements CassandraDaemon.Server
     public Server(InetSocketAddress socket)
     {
         this.socket = socket;
+        Gossiper.instance.register(new EventNotifier(this));
     }
 
     public Server(String hostname, int port)
@@ -115,15 +119,36 @@ public class Server implements CassandraDaemon.Server
         executionHandler = null;
     }
 
-    private static class ConnectionTracker implements Connection.Tracker
+    public static class ConnectionTracker implements Connection.Tracker
     {
-        public final ChannelLocal<Connection> openedConnections = new 
ChannelLocal<Connection>(true);
         public final ChannelGroup allChannels = new DefaultChannelGroup();
+        private final EnumMap<Event.Type, ChannelGroup> groups = new 
EnumMap<Event.Type, ChannelGroup>(Event.Type.class);
+
+        public ConnectionTracker()
+        {
+            for (Event.Type type : Event.Type.values())
+                groups.put(type, new DefaultChannelGroup(type.toString()));
+        }
 
         public void addConnection(Channel ch, Connection connection)
         {
             allChannels.add(ch);
-            openedConnections.set(ch, connection);
+        }
+
+        public void register(Event.Type type, Channel ch)
+        {
+            groups.get(type).add(ch);
+        }
+
+        public void unregister(Channel ch)
+        {
+            for (ChannelGroup group : groups.values())
+                group.remove(ch);
+        }
+
+        public void send(Event event)
+        {
+            groups.get(event.type).write(new EventMessage(event));
         }
 
         public void closeAll()
@@ -155,7 +180,7 @@ public class Server implements CassandraDaemon.Server
 
             //pipeline.addLast("debug", new LoggingHandler());
 
-            pipeline.addLast("frameDecoder", new 
Frame.Decoder(server.connectionTracker, Connection.SERVER_FACTORY));
+            pipeline.addLast("frameDecoder", new 
Frame.Decoder(server.connectionTracker, ServerConnection.FACTORY));
             pipeline.addLast("frameEncoder", frameEncoder);
 
             pipeline.addLast("frameDecompressor", frameDecompressor);
@@ -171,4 +196,45 @@ public class Server implements CassandraDaemon.Server
             return pipeline;
       }
     }
+
+    private static class EventNotifier implements 
IEndpointStateChangeSubscriber
+    {
+        private final Server server;
+
+        private EventNotifier(Server server)
+        {
+            this.server = server;
+        }
+
+        public void onJoin(InetAddress endpoint, EndpointState epState)
+        {
+            // TODO: we don't gossip the native protocol ip/port yet, so use 
the
+            // endpoint address and ip on which this server is listening 
instead.
+            
server.connectionTracker.send(Event.TopologyChange.newNode(endpoint, 
server.socket.getPort()));
+        }
+
+        public void onChange(InetAddress endpoint, ApplicationState state, 
VersionedValue value)
+        {
+        }
+
+        public void onAlive(InetAddress endpoint, EndpointState state)
+        {
+            server.connectionTracker.send(Event.StatusChange.nodeUp(endpoint, 
server.socket.getPort()));
+        }
+
+        public void onDead(InetAddress endpoint, EndpointState state)
+        {
+            
server.connectionTracker.send(Event.StatusChange.nodeDown(endpoint, 
server.socket.getPort()));
+        }
+
+        public void onRemove(InetAddress endpoint)
+        {
+            
server.connectionTracker.send(Event.TopologyChange.removedNode(endpoint, 
server.socket.getPort()));
+        }
+
+        public void onRestart(InetAddress endpoint, EndpointState state)
+        {
+            server.connectionTracker.send(Event.StatusChange.nodeUp(endpoint, 
server.socket.getPort()));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java 
b/src/java/org/apache/cassandra/transport/ServerConnection.java
new file mode 100644
index 0000000..561828d
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import org.apache.cassandra.service.ClientState;
+
+public class ServerConnection extends Connection
+{
+    public static final Factory FACTORY = new Factory()
+    {
+        public Connection newConnection(Connection.Tracker tracker)
+        {
+            return new ServerConnection(tracker);
+        }
+    };
+
+    private enum State { UNINITIALIZED, AUTHENTICATION, READY; }
+
+    private final ClientState clientState;
+    private volatile State state;
+
+    public ServerConnection(Connection.Tracker tracker)
+    {
+        super(tracker);
+        this.clientState = new ClientState();
+        this.state = State.UNINITIALIZED;
+    }
+
+    public ClientState clientState()
+    {
+        return clientState;
+    }
+
+    public void validateNewMessage(Message.Type type)
+    {
+        switch (state)
+        {
+            case UNINITIALIZED:
+                if (type != Message.Type.STARTUP && type != 
Message.Type.OPTIONS)
+                    throw new ProtocolException(String.format("Unexpected 
message %s, expecting STARTUP or OPTIONS", type));
+                break;
+            case AUTHENTICATION:
+                if (type != Message.Type.CREDENTIALS)
+                    throw new ProtocolException(String.format("Unexpected 
message %s, needs authentication through CREDENTIALS message", type));
+                break;
+            case READY:
+                if (type == Message.Type.STARTUP)
+                    throw new ProtocolException("Unexpected message STARTUP, 
the connection is already initialized");
+                break;
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    public void applyStateTransition(Message.Type requestType, Message.Type 
responseType)
+    {
+        switch (state)
+        {
+            case UNINITIALIZED:
+                if (requestType == Message.Type.STARTUP)
+                {
+                    if (responseType == Message.Type.AUTHENTICATE)
+                        state = State.AUTHENTICATION;
+                    else if (responseType == Message.Type.READY)
+                        state = State.READY;
+                }
+                break;
+            case AUTHENTICATION:
+                assert requestType == Message.Type.CREDENTIALS;
+                if (responseType == Message.Type.READY)
+                    state = State.READY;
+            case READY:
+                break;
+            default:
+                throw new AssertionError();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java 
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 66cf5ae..ea0a3df 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -50,15 +50,15 @@ public class SimpleClient
     public final int port;
 
     protected final ResponseHandler responseHandler = new ResponseHandler();
-    protected final ClientConnection connection = new ClientConnection();
     protected final Connection.Tracker tracker = new ConnectionTracker();
+    protected final Connection connection = new Connection(tracker);
     protected ClientBootstrap bootstrap;
     protected Channel channel;
     protected ChannelFuture lastWriteFuture;
 
     private final Connection.Factory connectionFactory = new 
Connection.Factory()
     {
-        public Connection newConnection()
+        public Connection newConnection(Connection.Tracker tracker)
         {
             return connection;
         }
@@ -164,18 +164,6 @@ public class SimpleClient
         }
     }
 
-    protected static class ClientConnection extends Connection
-    {
-        public ClientState clientState()
-        {
-            return null;
-        }
-
-        public void validateNewMessage(Message.Type type) {}
-
-        public void applyStateTransition(Message.Type requestType, 
Message.Type responseType) {}
-    }
-
     // Stateless handlers
     private static final Message.ProtocolDecoder messageDecoder = new 
Message.ProtocolDecoder();
     private static final Message.ProtocolEncoder messageEncoder = new 
Message.ProtocolEncoder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java 
b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index e318f8e..c103c93 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -25,6 +25,7 @@ import org.jboss.netty.buffer.ChannelBuffers;
 
 import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ServerConnection;
 import org.apache.cassandra.thrift.AuthenticationException;
 
 /**
@@ -77,7 +78,7 @@ public class CredentialsMessage extends Message.Request
     {
         try
         {
-            connection.clientState().login(credentials);
+            ((ServerConnection)connection).clientState().login(credentials);
             return new ReadyMessage();
         }
         catch (AuthenticationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/EventMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java 
b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
new file mode 100644
index 0000000..0028395
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.transport.Event;
+import org.apache.cassandra.transport.Message;
+
+public class EventMessage extends Message.Response
+{
+    public static final Message.Codec<EventMessage> codec = new 
Message.Codec<EventMessage>()
+    {
+        public EventMessage decode(ChannelBuffer body)
+        {
+            return new EventMessage(Event.deserialize(body));
+        }
+
+        public ChannelBuffer encode(EventMessage msg)
+        {
+            return msg.event.serialize();
+        }
+    };
+
+    public final Event event;
+
+    public EventMessage(Event event)
+    {
+        super(Message.Type.EVENT);
+        this.event = event;
+        this.setStreamId(-1);
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "EVENT " + event;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 9d2d0e4..4172862 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -83,12 +83,13 @@ public class ExecuteMessage extends Message.Request
     {
         try
         {
-            CQLStatement statement = 
connection.clientState().getCQL3Prepared().get(statementId);
+            ServerConnection c = (ServerConnection)connection;
+            CQLStatement statement = 
c.clientState().getCQL3Prepared().get(statementId);
 
             if (statement == null)
                 throw new InvalidRequestException(String.format("Prepared 
query with ID %d not found", statementId));
 
-            return QueryProcessor.processPrepared(statement, 
connection.clientState(), values);
+            return QueryProcessor.processPrepared(statement, c.clientState(), 
values);
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java 
b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index 4141763..5c2636a 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -55,7 +55,7 @@ public class PrepareMessage extends Message.Request
     {
         try
         {
-            return QueryProcessor.prepare(query, connection.clientState());
+            return QueryProcessor.prepare(query, 
((ServerConnection)connection).clientState());
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java 
b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 399fe95..c291e1d 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -61,7 +61,7 @@ public class QueryMessage extends Message.Request
     {
         try
         {
-            return QueryProcessor.process(query, connection.clientState());
+            return QueryProcessor.process(query, 
((ServerConnection)connection).clientState());
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java 
b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
new file mode 100644
index 0000000..656e03c
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.transport.*;
+
+public class RegisterMessage extends Message.Request
+{
+    public static final Message.Codec<RegisterMessage> codec = new 
Message.Codec<RegisterMessage>()
+    {
+        public RegisterMessage decode(ChannelBuffer body)
+        {
+            List<String> l = CBUtil.readStringList(body);
+            List<Event.Type> eventTypes = new ArrayList<Event.Type>(l.size());
+            for (String s : l)
+                eventTypes.add(Enum.valueOf(Event.Type.class, 
s.toUpperCase()));
+            return new RegisterMessage(eventTypes);
+        }
+
+        public ChannelBuffer encode(RegisterMessage msg)
+        {
+            List<String> l = new ArrayList<String>(msg.eventTypes.size());
+            for (Event.Type type : msg.eventTypes)
+                l.add(type.toString());
+            ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
+            CBUtil.writeStringList(cb, l);
+            return cb;
+        }
+    };
+
+    public final List<Event.Type> eventTypes;
+
+    public RegisterMessage(List<Event.Type> eventTypes)
+    {
+        super(Message.Type.REGISTER);
+        this.eventTypes = eventTypes;
+    }
+
+    public Response execute()
+    {
+        assert connection instanceof ServerConnection;
+        Connection.Tracker tracker = 
((ServerConnection)connection).getTracker();
+        assert tracker instanceof Server.ConnectionTracker;
+        for (Event.Type type : eventTypes)
+            ((Server.ConnectionTracker)tracker).register(type, 
connection().channel());
+        return new ReadyMessage();
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "REGISTER " + eventTypes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f199fa39/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java 
b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 23199f1..24430e0 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -69,12 +69,15 @@ public class StartupMessage extends Message.Request
     {
         try
         {
+            assert connection instanceof ServerConnection;
+            ServerConnection c = (ServerConnection)connection;
+
             String cqlVersion = options.get(CQL_VERSION);
             if (cqlVersion == null)
                 throw new ProtocolException("Missing value CQL_VERSION in 
STARTUP message");
 
-            connection.clientState().setCQLVersion(cqlVersion);
-            if (connection.clientState().getCQLVersion().compareTo(new 
SemanticVersion("2.99.0")) < 0)
+            c.clientState().setCQLVersion(cqlVersion);
+            if (c.clientState().getCQLVersion().compareTo(new 
SemanticVersion("2.99.0")) < 0)
                 throw new ProtocolException(String.format("CQL version %s is 
not support by the binary protocol (supported version are >= 3.0.0)", 
cqlVersion));
 
             if (options.containsKey(COMPRESSION))
@@ -92,7 +95,7 @@ public class StartupMessage extends Message.Request
                 }
             }
 
-            if (connection.clientState().isLogged())
+            if (c.clientState().isLogged())
                 return new ReadyMessage();
             else
                 return new 
AuthenticateMessage(DatabaseDescriptor.getAuthenticator().getClass().getName());

Reply via email to