Expose connected thrift + native client counts
patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-5084


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b7dd5e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b7dd5e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b7dd5e6

Branch: refs/heads/cassandra-2.0
Commit: 5b7dd5e62897d0ac13f96330e360d2262aeb0650
Parents: cf38e9e
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Fri Oct 4 21:51:39 2013 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Fri Oct 4 21:51:39 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/metrics/ClientMetrics.java | 35 ++++++++++++++++++
 .../cassandra/thrift/CassandraServer.java       | 19 +++++++++-
 .../cassandra/thrift/ThriftSessionManager.java  |  5 +++
 .../org/apache/cassandra/transport/Server.java  | 37 ++++++++++++++------
 5 files changed, 86 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b7dd5e6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3dc5e77..5af4e2e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
  * Fix fat client schema pull NPE (CASSANDRA-6089)
  * Fix memtable flushing for indexed tables (CASSANDRA-6112)
  * Fix skipping columns with multiple slices (CASSANDRA-6119)
+ * Expose connected thrift + native client counts (CASSANDRA-5084)
 
 
 1.2.10

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b7dd5e6/src/java/org/apache/cassandra/metrics/ClientMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java 
b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
new file mode 100644
index 0000000..cb10ad5
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
@@ -0,0 +1,35 @@
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.Callable;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+
+public class ClientMetrics
+{
+    private static final MetricNameFactory factory = new 
DefaultNameFactory("Client");
+    
+    public static final ClientMetrics instance = new ClientMetrics();
+    
+    private ClientMetrics()
+    {
+    }
+
+    public void addCounter(String name, final Callable<Integer> provider)
+    {
+        Metrics.newGauge(factory.createMetricName(name), new Gauge<Integer>()
+        {
+            public Integer value()
+            {
+                try
+                {
+                    return provider.call();
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b7dd5e6/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 1959815..65ae177 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeoutException;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
@@ -53,8 +54,12 @@ import 
org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.DynamicEndpointSnitch;
+import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.scheduler.IRequestScheduler;
-import org.apache.cassandra.service.*;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
@@ -80,6 +85,7 @@ public class CassandraServer implements Cassandra.Iface
     public CassandraServer()
     {
         requestScheduler = DatabaseDescriptor.getRequestScheduler();
+        registerMetrics();
     }
 
     public ThriftClientState state()
@@ -1877,5 +1883,16 @@ public class CassandraServer implements Cassandra.Iface
         return false;
     }
 
+    private void registerMetrics()
+    {
+        ClientMetrics.instance.addCounter("connectedThriftClients", new 
Callable<Integer>()
+        {
+            @Override
+            public Integer call() throws Exception
+            {
+                return ThriftSessionManager.instance.getConnectedClients();
+            }
+        });
+    }
     // main method moved to CassandraDaemon
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b7dd5e6/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java 
b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
index bbc4bff..9a537e8 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
@@ -73,4 +73,9 @@ public class ThriftSessionManager
         if (logger.isTraceEnabled())
             logger.trace("ClientState removed for socket addr {}", socket);
     }
+    
+    public int getConnectedClients()
+    {
+        return activeSocketSessions.size();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b7dd5e6/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java 
b/src/java/org/apache/cassandra/transport/Server.java
index 7400a8b..0ffb92b 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.EnumMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.net.ssl.SSLContext;
@@ -32,19 +33,12 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.security.SSLFactory;
-import org.apache.cassandra.service.CassandraDaemon;
-import org.apache.cassandra.service.IEndpointLifecycleSubscriber;
-import org.apache.cassandra.service.IMigrationListener;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.*;
 import org.apache.cassandra.transport.messages.EventMessage;
 import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.*;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
@@ -76,6 +70,7 @@ public class Server implements CassandraDaemon.Server
         EventNotifier notifier = new EventNotifier(this);
         StorageService.instance.register(notifier);
         MigrationManager.instance.register(notifier);
+        registerMetrics();
     }
 
     public Server(String hostname, int port)
@@ -140,6 +135,18 @@ public class Server implements CassandraDaemon.Server
         connectionTracker.allChannels.add(channel);
     }
 
+    private void registerMetrics()
+    {
+        ClientMetrics.instance.addCounter("connectedNativeClients", new 
Callable<Integer>()
+        {
+            @Override
+            public Integer call() throws Exception
+            {
+                return connectionTracker.getConnectedClients();
+            }
+        });
+    }
+
     private void close()
     {
         // Close opened connections
@@ -187,6 +194,16 @@ public class Server implements CassandraDaemon.Server
         {
             allChannels.close().awaitUninterruptibly();
         }
+
+        public int getConnectedClients()
+        {
+            /*
+              - When server is running: allChannels contains all clients' 
connections (channels) 
+                plus one additional channel used for the server's own 
bootstrap.
+               - When server is stopped: the size is 0
+            */
+            return allChannels.size() != 0 ? allChannels.size() - 1 : 0;
+        }
     }
 
     private static class PipelineFactory implements ChannelPipelineFactory

Reply via email to