Partition intra-cluster message streams by size, not type

patch by ariel; reviewed by benedict for CASSANDRA-8789


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/144644bb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/144644bb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/144644bb

Branch: refs/heads/trunk
Commit: 144644bbf77a546c45db384e2dbc18e13f65c9ce
Parents: 0e5e7d9
Author: Ariel Weisberg <ariel.weisb...@datastax.com>
Authored: Wed Mar 18 10:44:22 2015 +0000
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Wed Mar 18 10:44:22 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/metrics/ConnectionMetrics.java    | 61 ++++++++++++--------
 .../org/apache/cassandra/net/MessageOut.java    | 36 +++++++++++-
 .../apache/cassandra/net/MessagingService.java  | 34 ++++++-----
 .../cassandra/net/MessagingServiceMBean.java    | 25 ++++----
 .../net/OutboundTcpConnectionPool.java          | 44 +++++++-------
 .../org/apache/cassandra/tools/NodeTool.java    | 32 +++++-----
 .../apache/cassandra/utils/StatusLogger.java    | 14 ++---
 8 files changed, 152 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2661723..ae98f56 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Partition intra-cluster message streams by size, not type (CASSANDRA-8789)
  * Add nodetool command to validate all sstables in a node (CASSANDRA-5791)
  * Add WriteFailureException to native protocol, notify coordinator of
    write failures (CASSANDRA-8592)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java 
b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
index 60020b3..73dd0bd 100644
--- a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
@@ -38,16 +38,19 @@ public class ConnectionMetrics
     public static final Meter totalTimeouts = 
Metrics.meter(DefaultNameFactory.createMetricName(TYPE_NAME, "TotalTimeouts", 
null));
 
     public final String address;
-    /** Pending tasks for Command(Mutations, Read etc) TCP Connections */
-    public final Gauge<Integer> commandPendingTasks;
-    /** Completed tasks for Command(Mutations, Read etc) TCP Connections */
-    public final Gauge<Long> commandCompletedTasks;
-    /** Dropped tasks for Command(Mutations, Read etc) TCP Connections */
-    public final Gauge<Long> commandDroppedTasks;
-    /** Pending tasks for Response(GOSSIP & RESPONSE) TCP Connections */
-    public final Gauge<Integer> responsePendingTasks;
-    /** Completed tasks for Response(GOSSIP & RESPONSE) TCP Connections */
-    public final Gauge<Long> responseCompletedTasks;
+    /** Pending tasks for large message TCP Connections */
+    public final Gauge<Integer> largeMessagePendingTasks;
+    /** Completed tasks for large message TCP Connections */
+    public final Gauge<Long> largeMessageCompletedTasks;
+    /** Dropped tasks for large message TCP Connections */
+    public final Gauge<Long> largeMessageDroppedTasks;
+    /** Pending tasks for small message TCP Connections */
+    public final Gauge<Integer> smallMessagePendingTasks;
+    /** Completed tasks for small message TCP Connections */
+    public final Gauge<Long> smallMessageCompletedTasks;
+    /** Dropped tasks for small message TCP Connections */
+    public final Gauge<Long> smallMessageDroppedTasks;
+
     /** Number of timeouts for specific IP */
     public final Meter timeouts;
 
@@ -66,39 +69,46 @@ public class ConnectionMetrics
 
         factory = new DefaultNameFactory("Connection", address);
 
-        commandPendingTasks = 
Metrics.register(factory.createMetricName("CommandPendingTasks"), new 
Gauge<Integer>()
+        largeMessagePendingTasks = 
Metrics.register(factory.createMetricName("LargeMessagePendingTasks"), new 
Gauge<Integer>()
         {
             public Integer getValue()
             {
-                return connectionPool.cmdCon.getPendingMessages();
+                return connectionPool.largeMessages.getPendingMessages();
             }
         });
-        commandCompletedTasks = 
Metrics.register(factory.createMetricName("CommandCompletedTasks"), new 
Gauge<Long>()
+        largeMessageCompletedTasks = 
Metrics.register(factory.createMetricName("LargeMessageCompletedTasks"), new 
Gauge<Long>()
         {
             public Long getValue()
             {
-                return connectionPool.cmdCon.getCompletedMesssages();
+                return connectionPool.largeMessages.getCompletedMesssages();
             }
         });
-        commandDroppedTasks = 
Metrics.register(factory.createMetricName("CommandDroppedTasks"), new 
Gauge<Long>()
+        largeMessageDroppedTasks = 
Metrics.register(factory.createMetricName("LargeMessageDroppedTasks"), new 
Gauge<Long>()
         {
             public Long getValue()
             {
-                return connectionPool.cmdCon.getDroppedMessages();
+                return connectionPool.largeMessages.getDroppedMessages();
             }
         });
-        responsePendingTasks = 
Metrics.register(factory.createMetricName("ResponsePendingTasks"), new 
Gauge<Integer>()
+        smallMessagePendingTasks = 
Metrics.register(factory.createMetricName("SmallMessagePendingTasks"), new 
Gauge<Integer>()
         {
             public Integer getValue()
             {
-                return connectionPool.ackCon.getPendingMessages();
+                return connectionPool.smallMessages.getPendingMessages();
+            }
+        });
+        smallMessageCompletedTasks = 
Metrics.register(factory.createMetricName("SmallMessageCompletedTasks"), new 
Gauge<Long>()
+        {
+            public Long getValue()
+            {
+                return connectionPool.smallMessages.getCompletedMesssages();
             }
         });
-        responseCompletedTasks = 
Metrics.register(factory.createMetricName("ResponseCompletedTasks"), new 
Gauge<Long>()
+        smallMessageDroppedTasks = 
Metrics.register(factory.createMetricName("SmallMessageDroppedTasks"), new 
Gauge<Long>()
         {
             public Long getValue()
             {
-                return connectionPool.ackCon.getCompletedMesssages();
+                return connectionPool.smallMessages.getDroppedMessages();
             }
         });
         timeouts = Metrics.meter(factory.createMetricName("Timeouts"));
@@ -106,11 +116,12 @@ public class ConnectionMetrics
 
     public void release()
     {
-        Metrics.remove(factory.createMetricName("CommandPendingTasks"));
-        Metrics.remove(factory.createMetricName("CommandCompletedTasks"));
-        Metrics.remove(factory.createMetricName("CommandDroppedTasks"));
-        Metrics.remove(factory.createMetricName("ResponsePendingTasks"));
-        Metrics.remove(factory.createMetricName("ResponseCompletedTasks"));
+        Metrics.remove(factory.createMetricName("LargeMessagePendingTasks"));
+        Metrics.remove(factory.createMetricName("LargeMessageCompletedTasks"));
+        Metrics.remove(factory.createMetricName("LargeMessageDroppedTasks"));
+        Metrics.remove(factory.createMetricName("SmallMessagePendingTasks"));
+        Metrics.remove(factory.createMetricName("SmallMessageCompletedTasks"));
+        Metrics.remove(factory.createMetricName("SmallMessageDroppedTasks"));
         Metrics.remove(factory.createMetricName("Timeouts"));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java 
b/src/java/org/apache/cassandra/net/MessageOut.java
index 357d798..28038b3 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -46,6 +46,8 @@ public class MessageOut<T>
     public final T payload;
     public final IVersionedSerializer<T> serializer;
     public final Map<String, byte[]> parameters;
+    private long payloadSize = -1;
+    private int payloadSizeVersion = -1;
 
     // we do support messages that just consist of a verb
     public MessageOut(MessagingService.Verb verb)
@@ -86,7 +88,7 @@ public class MessageOut<T>
         return new MessageOut<T>(verb, payload, serializer, builder.build());
     }
 
-    public Stage getStage()
+    private Stage getStage()
     {
         return MessagingService.verbStages.get(verb);
     }
@@ -116,7 +118,7 @@ public class MessageOut<T>
             out.write(entry.getValue());
         }
 
-        long longSize = payload == null ? 0 : 
serializer.serializedSize(payload, version);
+        long longSize = payloadSize(version);
         assert longSize <= Integer.MAX_VALUE; // larger values are supported 
in sstables but not messages
         out.writeInt((int) longSize);
         if (payload != null)
@@ -136,10 +138,38 @@ public class MessageOut<T>
             size += entry.getValue().length;
         }
 
-        long longSize = payload == null ? 0 : 
serializer.serializedSize(payload, version);
+        long longSize = payloadSize(version);
         assert longSize <= Integer.MAX_VALUE; // larger values are supported 
in sstables but not messages
         size += TypeSizes.NATIVE.sizeof((int) longSize);
         size += longSize;
         return size;
     }
+
+    /**
+     * Calculate the size of the payload of this message for the specified 
protocol version
+     * and memoize the result for the specified protocol version. Memoization 
only covers the protocol
+     * version of the first invocation.
+     *
+     * It is not safe to call payloadSize concurrently from multiple threads 
unless it has already been invoked
+     * once from a single thread and there is a happens before relationship 
between that invocation and other
+     * threads concurrently invoking payloadSize.
+     *
+     * For instance it would be safe to invokePayload size to make a decision 
in the thread that created the message
+     * and then hand it off to other threads via a thread-safe queue, volatile 
write, or synchronized/ReentrantLock.
+     * @param version Protocol version to use when calculating payload size
+     * @return Size of the payload of this message in bytes
+     */
+    public long payloadSize(int version)
+    {
+        if (payloadSize == -1)
+        {
+            payloadSize = payload == null ? 0 : 
serializer.serializedSize(payload, version);
+            payloadSizeVersion = version;
+        }
+        else if (payloadSizeVersion != version)
+        {
+            return payload == null ? 0 : serializer.serializedSize(payload, 
version);
+        }
+        return payloadSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index fb699e4..65b93ce 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -540,7 +540,7 @@ public final class MessagingService implements 
MessagingServiceMBean
         cp.waitForStarted();
         return cp;
     }
-    
+
 
     public OutboundTcpConnection getConnection(InetAddress to, MessageOut msg)
     {
@@ -964,52 +964,60 @@ public final class MessagingService implements 
MessagingServiceMBean
         }
     }
 
-    public Map<String, Integer> getCommandPendingTasks()
+    public Map<String, Integer> getLargeMessagePendingTasks()
     {
         Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(connectionManagers.size());
         for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().cmdCon.getPendingMessages());
+            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().largeMessages.getPendingMessages());
         return pendingTasks;
     }
 
-    public int getCommandPendingTasks(InetAddress address)
+    public int getLargeMessagePendingTasks(InetAddress address)
     {
         OutboundTcpConnectionPool connection = connectionManagers.get(address);
-        return connection == null ? 0 : connection.cmdCon.getPendingMessages();
+        return connection == null ? 0 : 
connection.largeMessages.getPendingMessages();
     }
 
-    public Map<String, Long> getCommandCompletedTasks()
+    public Map<String, Long> getLargeMessageCompletedTasks()
     {
         Map<String, Long> completedTasks = new HashMap<String, 
Long>(connectionManagers.size());
         for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().cmdCon.getCompletedMesssages());
+            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().largeMessages.getCompletedMesssages());
         return completedTasks;
     }
 
-    public Map<String, Long> getCommandDroppedTasks()
+    public Map<String, Long> getLargeMessageDroppedTasks()
     {
         Map<String, Long> droppedTasks = new HashMap<String, 
Long>(connectionManagers.size());
         for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            droppedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().cmdCon.getDroppedMessages());
+            droppedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().largeMessages.getDroppedMessages());
         return droppedTasks;
     }
 
-    public Map<String, Integer> getResponsePendingTasks()
+    public Map<String, Integer> getSmallMessagePendingTasks()
     {
         Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(connectionManagers.size());
         for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().ackCon.getPendingMessages());
+            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().smallMessages.getPendingMessages());
         return pendingTasks;
     }
 
-    public Map<String, Long> getResponseCompletedTasks()
+    public Map<String, Long> getSmallMessageCompletedTasks()
     {
         Map<String, Long> completedTasks = new HashMap<String, 
Long>(connectionManagers.size());
         for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().ackCon.getCompletedMesssages());
+            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().smallMessages.getCompletedMesssages());
         return completedTasks;
     }
 
+    public Map<String, Long> getSmallMessageDroppedTasks()
+    {
+        Map<String, Long> droppedTasks = new HashMap<String, 
Long>(connectionManagers.size());
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
+            droppedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().smallMessages.getDroppedMessages());
+        return droppedTasks;
+    }
+
     public Map<String, Integer> getDroppedMessages()
     {
         Map<String, Integer> map = new HashMap<String, 
Integer>(droppedMessages.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java 
b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
index 1d00656..f1b418c 100644
--- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
+++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
@@ -29,29 +29,34 @@ import java.util.Map;
 public interface MessagingServiceMBean
 {
     /**
-     * Pending tasks for Command(Mutations, Read etc) TCP Connections
+     * Pending tasks for large message TCP Connections
      */
-    public Map<String, Integer> getCommandPendingTasks();
+    public Map<String, Integer> getLargeMessagePendingTasks();
 
     /**
-     * Completed tasks for Command(Mutations, Read etc) TCP Connections
+     * Completed tasks for large message) TCP Connections
      */
-    public Map<String, Long> getCommandCompletedTasks();
+    public Map<String, Long> getLargeMessageCompletedTasks();
 
     /**
-     * Dropped tasks for Command(Mutations, Read etc) TCP Connections
+     * Dropped tasks for large message TCP Connections
      */
-    public Map<String, Long> getCommandDroppedTasks();
+    public Map<String, Long> getLargeMessageDroppedTasks();
 
     /**
-     * Pending tasks for Response(GOSSIP & RESPONSE) TCP Connections
+     * Pending tasks for small message TCP Connections
      */
-    public Map<String, Integer> getResponsePendingTasks();
+    public Map<String, Integer> getSmallMessagePendingTasks();
 
     /**
-     * Completed tasks for Response(GOSSIP & RESPONSE) TCP Connections
+     * Completed tasks for small message TCP Connections
      */
-    public Map<String, Long> getResponseCompletedTasks();
+    public Map<String, Long> getSmallMessageCompletedTasks();
+
+    /**
+     * Dropped tasks for small message TCP Connections
+     */
+    public Map<String, Long> getSmallMessageDroppedTasks();
 
     /**
      * dropped message counts for server lifetime

http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 6395aea..855763e 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -36,11 +36,14 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class OutboundTcpConnectionPool
 {
+    public static final long LARGE_MESSAGE_THRESHOLD =
+            Long.getLong(Config.PROPERTY_PREFIX + 
"OTCP_LARGE_MESSAGE_THRESHOLD", 1024 * 64);
+
     // pointer for the real Address.
     private final InetAddress id;
     private final CountDownLatch started;
-    public final OutboundTcpConnection cmdCon;
-    public final OutboundTcpConnection ackCon;
+    public final OutboundTcpConnection smallMessages;
+    public final OutboundTcpConnection largeMessages;
     // pointer to the reset Address.
     private InetAddress resetEndpoint;
     private ConnectionMetrics metrics;
@@ -51,8 +54,8 @@ public class OutboundTcpConnectionPool
         resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
         started = new CountDownLatch(1);
 
-        cmdCon = new OutboundTcpConnection(this);
-        ackCon = new OutboundTcpConnection(this);
+        smallMessages = new OutboundTcpConnection(this);
+        largeMessages = new OutboundTcpConnection(this);
     }
 
     /**
@@ -61,21 +64,20 @@ public class OutboundTcpConnectionPool
      */
     OutboundTcpConnection getConnection(MessageOut msg)
     {
-        Stage stage = msg.getStage();
-        return stage == Stage.REQUEST_RESPONSE || stage == 
Stage.INTERNAL_RESPONSE || stage == Stage.GOSSIP
-               ? ackCon
-               : cmdCon;
+        return msg.payloadSize(smallMessages.getTargetVersion()) > 
LARGE_MESSAGE_THRESHOLD
+               ? largeMessages
+               : smallMessages;
     }
 
     void reset()
     {
-        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
cmdCon, ackCon })
+        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
smallMessages, largeMessages })
             conn.closeSocket(false);
     }
 
     public void resetToNewerVersion(int version)
     {
-        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
cmdCon, ackCon })
+        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
smallMessages, largeMessages })
         {
             if (version > conn.getTargetVersion())
                 conn.softCloseSocket();
@@ -91,7 +93,7 @@ public class OutboundTcpConnectionPool
     {
         SystemKeyspace.updatePreferredIP(id, remoteEP);
         resetEndpoint = remoteEP;
-        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
cmdCon, ackCon })
+        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
smallMessages, largeMessages })
             conn.softCloseSocket();
 
         // release previous metrics and create new one with reset address
@@ -163,17 +165,17 @@ public class OutboundTcpConnectionPool
         }
         return true;
     }
-    
+
     public void start()
     {
-        cmdCon.start();
-        ackCon.start();
+        smallMessages.start();
+        largeMessages.start();
 
         metrics = new ConnectionMetrics(id, this);
-        
+
         started.countDown();
     }
-    
+
     public void waitForStarted()
     {
         if (started.getCount() == 0)
@@ -197,11 +199,11 @@ public class OutboundTcpConnectionPool
     public void close()
     {
         // these null guards are simply for tests
-        if (ackCon != null)
-            ackCon.closeSocket(true);
-        if (cmdCon != null)
-            cmdCon.closeSocket(true);
-        
+        if (largeMessages != null)
+            largeMessages.closeSocket(true);
+        if (smallMessages != null)
+            smallMessages.closeSocket(true);
+
         metrics.release();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index 1fadd14..d9cb5d9 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -255,7 +255,7 @@ public class NodeTool
             try (NodeProbe probe = connect())
             {
                 execute(probe);
-            } 
+            }
             catch (IOException e)
             {
                 throw new RuntimeException("Error while closing JMX 
connection", e);
@@ -510,20 +510,20 @@ public class NodeTool
             try
             {
                 ownerships = probe.effectiveOwnership(keyspace);
-            } 
+            }
             catch (IllegalStateException ex)
             {
                 ownerships = probe.getOwnership();
                 errors.append("Note: " + ex.getMessage() + "%n");
                 showEffectiveOwnership = false;
-            } 
+            }
             catch (IllegalArgumentException ex)
             {
                 System.out.printf("%nError: " + ex.getMessage() + "%n");
                 return;
             }
 
-            
+
             System.out.println();
             for (Entry<String, SetHostStat> entry : getOwnershipByDc(probe, 
resolveIp, tokensToEndpoints, ownerships).entrySet())
                 printDc(probe, format, entry.getKey(), endpointsToTokens, 
entry.getValue(),showEffectiveOwnership);
@@ -673,20 +673,20 @@ public class NodeTool
                 long completed;
 
                 pending = 0;
-                for (int n : ms.getCommandPendingTasks().values())
+                for (int n : ms.getLargeMessagePendingTasks().values())
                     pending += n;
                 completed = 0;
-                for (long n : ms.getCommandCompletedTasks().values())
+                for (long n : ms.getLargeMessageCompletedTasks().values())
                     completed += n;
-                System.out.printf("%-25s%10s%10s%15s%n", "Commands", "n/a", 
pending, completed);
+                System.out.printf("%-25s%10s%10s%15s%n", "Large messages", 
"n/a", pending, completed);
 
                 pending = 0;
-                for (int n : ms.getResponsePendingTasks().values())
+                for (int n : ms.getSmallMessagePendingTasks().values())
                     pending += n;
                 completed = 0;
-                for (long n : ms.getResponseCompletedTasks().values())
+                for (long n : ms.getSmallMessageCompletedTasks().values())
                     completed += n;
-                System.out.printf("%-25s%10s%10s%15s%n", "Responses", "n/a", 
pending, completed);
+                System.out.printf("%-25s%10s%10s%15s%n", "Small messages", 
"n/a", pending, completed);
             }
         }
     }
@@ -2173,7 +2173,7 @@ public class NodeTool
             unreachableNodes = probe.getUnreachableNodes();
             hostIDMap = probe.getHostIdMap();
             epSnitchInfo = probe.getEndpointSnitchInfoProxy();
-            
+
             StringBuffer errors = new StringBuffer();
 
             Map<InetAddress, Float> ownerships = null;
@@ -2226,9 +2226,9 @@ public class NodeTool
                     printNode(endpoint.getHostAddress(), owns, tokens, 
hasEffectiveOwns, isTokenPerNode);
                 }
             }
-            
+
             System.out.printf("%n" + errors.toString());
-            
+
         }
 
         private void findMaxAddressLength(Map<String, SetHostStat> dcs)
@@ -2314,7 +2314,7 @@ public class NodeTool
         }
     }
 
-    private static Map<String, SetHostStat> getOwnershipByDc(NodeProbe probe, 
boolean resolveIp, 
+    private static Map<String, SetHostStat> getOwnershipByDc(NodeProbe probe, 
boolean resolveIp,
                                                              Map<String, 
String> tokenToEndpoint,
                                                              Map<InetAddress, 
Float> ownerships)
     {
@@ -2699,7 +2699,7 @@ public class NodeTool
                 probe.truncateHints(endpoint);
         }
     }
-    
+
     @Command(name = "setlogginglevel", description = "Set the log level 
threshold for a given class. If both class and level are empty/null, it will 
reset to the initial configuration")
     public static class SetLoggingLevel extends NodeToolCmd
     {
@@ -2714,7 +2714,7 @@ public class NodeTool
             probe.setLoggingLevel(classQualifier, level);
         }
     }
-    
+
     @Command(name = "getlogginglevels", description = "Get the runtime logging 
levels")
     public static class GetLoggingLevels extends NodeToolCmd
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java 
b/src/java/org/apache/cassandra/utils/StatusLogger.java
index a1d5e18..32470e8 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -64,18 +64,18 @@ public class StatusLogger
         // one offs
         logger.info(String.format("%-25s%10s%10s",
                                   "CompactionManager", 
CompactionManager.instance.getActiveCompactions(), 
CompactionManager.instance.getPendingTasks()));
-        int pendingCommands = 0;
-        for (int n : 
MessagingService.instance().getCommandPendingTasks().values())
+        int pendingLargeMessages = 0;
+        for (int n : 
MessagingService.instance().getLargeMessagePendingTasks().values())
         {
-            pendingCommands += n;
+            pendingLargeMessages += n;
         }
-        int pendingResponses = 0;
-        for (int n : 
MessagingService.instance().getResponsePendingTasks().values())
+        int pendingSmallMessages = 0;
+        for (int n : 
MessagingService.instance().getSmallMessagePendingTasks().values())
         {
-            pendingResponses += n;
+            pendingSmallMessages += n;
         }
         logger.info(String.format("%-25s%10s%10s",
-                                  "MessagingService", "n/a", pendingCommands + 
"/" + pendingResponses));
+                                  "MessagingService", "n/a", 
pendingLargeMessages + "/" + pendingSmallMessages));
 
         // Global key/row cache information
         AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = 
CacheService.instance.keyCache;

Reply via email to