Partition intra-cluster message streams by size, not type patch by ariel; reviewed by benedict for CASSANDRA-8789
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/144644bb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/144644bb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/144644bb Branch: refs/heads/trunk Commit: 144644bbf77a546c45db384e2dbc18e13f65c9ce Parents: 0e5e7d9 Author: Ariel Weisberg <ariel.weisb...@datastax.com> Authored: Wed Mar 18 10:44:22 2015 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Wed Mar 18 10:44:22 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/metrics/ConnectionMetrics.java | 61 ++++++++++++-------- .../org/apache/cassandra/net/MessageOut.java | 36 +++++++++++- .../apache/cassandra/net/MessagingService.java | 34 ++++++----- .../cassandra/net/MessagingServiceMBean.java | 25 ++++---- .../net/OutboundTcpConnectionPool.java | 44 +++++++------- .../org/apache/cassandra/tools/NodeTool.java | 32 +++++----- .../apache/cassandra/utils/StatusLogger.java | 14 ++--- 8 files changed, 152 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2661723..ae98f56 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Partition intra-cluster message streams by size, not type (CASSANDRA-8789) * Add nodetool command to validate all sstables in a node (CASSANDRA-5791) * Add WriteFailureException to native protocol, notify coordinator of write failures (CASSANDRA-8592) http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java index 60020b3..73dd0bd 100644 --- a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java @@ -38,16 +38,19 @@ public class ConnectionMetrics public static final Meter totalTimeouts = Metrics.meter(DefaultNameFactory.createMetricName(TYPE_NAME, "TotalTimeouts", null)); public final String address; - /** Pending tasks for Command(Mutations, Read etc) TCP Connections */ - public final Gauge<Integer> commandPendingTasks; - /** Completed tasks for Command(Mutations, Read etc) TCP Connections */ - public final Gauge<Long> commandCompletedTasks; - /** Dropped tasks for Command(Mutations, Read etc) TCP Connections */ - public final Gauge<Long> commandDroppedTasks; - /** Pending tasks for Response(GOSSIP & RESPONSE) TCP Connections */ - public final Gauge<Integer> responsePendingTasks; - /** Completed tasks for Response(GOSSIP & RESPONSE) TCP Connections */ - public final Gauge<Long> responseCompletedTasks; + /** Pending tasks for large message TCP Connections */ + public final Gauge<Integer> largeMessagePendingTasks; + /** Completed tasks for large message TCP Connections */ + public final Gauge<Long> largeMessageCompletedTasks; + /** Dropped tasks for large message TCP Connections */ + public final Gauge<Long> largeMessageDroppedTasks; + /** Pending tasks for small message TCP Connections */ + public final Gauge<Integer> smallMessagePendingTasks; + /** Completed tasks for small message TCP Connections */ + public final Gauge<Long> smallMessageCompletedTasks; + /** Dropped tasks for small message TCP Connections */ + public final Gauge<Long> smallMessageDroppedTasks; + /** Number of timeouts for specific IP */ public final Meter timeouts; @@ -66,39 +69,46 @@ public class ConnectionMetrics factory = new DefaultNameFactory("Connection", address); - commandPendingTasks = Metrics.register(factory.createMetricName("CommandPendingTasks"), new Gauge<Integer>() + largeMessagePendingTasks = Metrics.register(factory.createMetricName("LargeMessagePendingTasks"), new Gauge<Integer>() { public Integer getValue() { - return connectionPool.cmdCon.getPendingMessages(); + return connectionPool.largeMessages.getPendingMessages(); } }); - commandCompletedTasks = Metrics.register(factory.createMetricName("CommandCompletedTasks"), new Gauge<Long>() + largeMessageCompletedTasks = Metrics.register(factory.createMetricName("LargeMessageCompletedTasks"), new Gauge<Long>() { public Long getValue() { - return connectionPool.cmdCon.getCompletedMesssages(); + return connectionPool.largeMessages.getCompletedMesssages(); } }); - commandDroppedTasks = Metrics.register(factory.createMetricName("CommandDroppedTasks"), new Gauge<Long>() + largeMessageDroppedTasks = Metrics.register(factory.createMetricName("LargeMessageDroppedTasks"), new Gauge<Long>() { public Long getValue() { - return connectionPool.cmdCon.getDroppedMessages(); + return connectionPool.largeMessages.getDroppedMessages(); } }); - responsePendingTasks = Metrics.register(factory.createMetricName("ResponsePendingTasks"), new Gauge<Integer>() + smallMessagePendingTasks = Metrics.register(factory.createMetricName("SmallMessagePendingTasks"), new Gauge<Integer>() { public Integer getValue() { - return connectionPool.ackCon.getPendingMessages(); + return connectionPool.smallMessages.getPendingMessages(); + } + }); + smallMessageCompletedTasks = Metrics.register(factory.createMetricName("SmallMessageCompletedTasks"), new Gauge<Long>() + { + public Long getValue() + { + return connectionPool.smallMessages.getCompletedMesssages(); } }); - responseCompletedTasks = Metrics.register(factory.createMetricName("ResponseCompletedTasks"), new Gauge<Long>() + smallMessageDroppedTasks = Metrics.register(factory.createMetricName("SmallMessageDroppedTasks"), new Gauge<Long>() { public Long getValue() { - return connectionPool.ackCon.getCompletedMesssages(); + return connectionPool.smallMessages.getDroppedMessages(); } }); timeouts = Metrics.meter(factory.createMetricName("Timeouts")); @@ -106,11 +116,12 @@ public class ConnectionMetrics public void release() { - Metrics.remove(factory.createMetricName("CommandPendingTasks")); - Metrics.remove(factory.createMetricName("CommandCompletedTasks")); - Metrics.remove(factory.createMetricName("CommandDroppedTasks")); - Metrics.remove(factory.createMetricName("ResponsePendingTasks")); - Metrics.remove(factory.createMetricName("ResponseCompletedTasks")); + Metrics.remove(factory.createMetricName("LargeMessagePendingTasks")); + Metrics.remove(factory.createMetricName("LargeMessageCompletedTasks")); + Metrics.remove(factory.createMetricName("LargeMessageDroppedTasks")); + Metrics.remove(factory.createMetricName("SmallMessagePendingTasks")); + Metrics.remove(factory.createMetricName("SmallMessageCompletedTasks")); + Metrics.remove(factory.createMetricName("SmallMessageDroppedTasks")); Metrics.remove(factory.createMetricName("Timeouts")); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/net/MessageOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java index 357d798..28038b3 100644 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@ -46,6 +46,8 @@ public class MessageOut<T> public final T payload; public final IVersionedSerializer<T> serializer; public final Map<String, byte[]> parameters; + private long payloadSize = -1; + private int payloadSizeVersion = -1; // we do support messages that just consist of a verb public MessageOut(MessagingService.Verb verb) @@ -86,7 +88,7 @@ public class MessageOut<T> return new MessageOut<T>(verb, payload, serializer, builder.build()); } - public Stage getStage() + private Stage getStage() { return MessagingService.verbStages.get(verb); } @@ -116,7 +118,7 @@ public class MessageOut<T> out.write(entry.getValue()); } - long longSize = payload == null ? 0 : serializer.serializedSize(payload, version); + long longSize = payloadSize(version); assert longSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages out.writeInt((int) longSize); if (payload != null) @@ -136,10 +138,38 @@ public class MessageOut<T> size += entry.getValue().length; } - long longSize = payload == null ? 0 : serializer.serializedSize(payload, version); + long longSize = payloadSize(version); assert longSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages size += TypeSizes.NATIVE.sizeof((int) longSize); size += longSize; return size; } + + /** + * Calculate the size of the payload of this message for the specified protocol version + * and memoize the result for the specified protocol version. Memoization only covers the protocol + * version of the first invocation. + * + * It is not safe to call payloadSize concurrently from multiple threads unless it has already been invoked + * once from a single thread and there is a happens before relationship between that invocation and other + * threads concurrently invoking payloadSize. + * + * For instance it would be safe to invokePayload size to make a decision in the thread that created the message + * and then hand it off to other threads via a thread-safe queue, volatile write, or synchronized/ReentrantLock. + * @param version Protocol version to use when calculating payload size + * @return Size of the payload of this message in bytes + */ + public long payloadSize(int version) + { + if (payloadSize == -1) + { + payloadSize = payload == null ? 0 : serializer.serializedSize(payload, version); + payloadSizeVersion = version; + } + else if (payloadSizeVersion != version) + { + return payload == null ? 0 : serializer.serializedSize(payload, version); + } + return payloadSize; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index fb699e4..65b93ce 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -540,7 +540,7 @@ public final class MessagingService implements MessagingServiceMBean cp.waitForStarted(); return cp; } - + public OutboundTcpConnection getConnection(InetAddress to, MessageOut msg) { @@ -964,52 +964,60 @@ public final class MessagingService implements MessagingServiceMBean } } - public Map<String, Integer> getCommandPendingTasks() + public Map<String, Integer> getLargeMessagePendingTasks() { Map<String, Integer> pendingTasks = new HashMap<String, Integer>(connectionManagers.size()); for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().cmdCon.getPendingMessages()); + pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessages.getPendingMessages()); return pendingTasks; } - public int getCommandPendingTasks(InetAddress address) + public int getLargeMessagePendingTasks(InetAddress address) { OutboundTcpConnectionPool connection = connectionManagers.get(address); - return connection == null ? 0 : connection.cmdCon.getPendingMessages(); + return connection == null ? 0 : connection.largeMessages.getPendingMessages(); } - public Map<String, Long> getCommandCompletedTasks() + public Map<String, Long> getLargeMessageCompletedTasks() { Map<String, Long> completedTasks = new HashMap<String, Long>(connectionManagers.size()); for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().cmdCon.getCompletedMesssages()); + completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessages.getCompletedMesssages()); return completedTasks; } - public Map<String, Long> getCommandDroppedTasks() + public Map<String, Long> getLargeMessageDroppedTasks() { Map<String, Long> droppedTasks = new HashMap<String, Long>(connectionManagers.size()); for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().cmdCon.getDroppedMessages()); + droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessages.getDroppedMessages()); return droppedTasks; } - public Map<String, Integer> getResponsePendingTasks() + public Map<String, Integer> getSmallMessagePendingTasks() { Map<String, Integer> pendingTasks = new HashMap<String, Integer>(connectionManagers.size()); for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().ackCon.getPendingMessages()); + pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessages.getPendingMessages()); return pendingTasks; } - public Map<String, Long> getResponseCompletedTasks() + public Map<String, Long> getSmallMessageCompletedTasks() { Map<String, Long> completedTasks = new HashMap<String, Long>(connectionManagers.size()); for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().ackCon.getCompletedMesssages()); + completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessages.getCompletedMesssages()); return completedTasks; } + public Map<String, Long> getSmallMessageDroppedTasks() + { + Map<String, Long> droppedTasks = new HashMap<String, Long>(connectionManagers.size()); + for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) + droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessages.getDroppedMessages()); + return droppedTasks; + } + public Map<String, Integer> getDroppedMessages() { Map<String, Integer> map = new HashMap<String, Integer>(droppedMessages.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/net/MessagingServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java index 1d00656..f1b418c 100644 --- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java +++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java @@ -29,29 +29,34 @@ import java.util.Map; public interface MessagingServiceMBean { /** - * Pending tasks for Command(Mutations, Read etc) TCP Connections + * Pending tasks for large message TCP Connections */ - public Map<String, Integer> getCommandPendingTasks(); + public Map<String, Integer> getLargeMessagePendingTasks(); /** - * Completed tasks for Command(Mutations, Read etc) TCP Connections + * Completed tasks for large message) TCP Connections */ - public Map<String, Long> getCommandCompletedTasks(); + public Map<String, Long> getLargeMessageCompletedTasks(); /** - * Dropped tasks for Command(Mutations, Read etc) TCP Connections + * Dropped tasks for large message TCP Connections */ - public Map<String, Long> getCommandDroppedTasks(); + public Map<String, Long> getLargeMessageDroppedTasks(); /** - * Pending tasks for Response(GOSSIP & RESPONSE) TCP Connections + * Pending tasks for small message TCP Connections */ - public Map<String, Integer> getResponsePendingTasks(); + public Map<String, Integer> getSmallMessagePendingTasks(); /** - * Completed tasks for Response(GOSSIP & RESPONSE) TCP Connections + * Completed tasks for small message TCP Connections */ - public Map<String, Long> getResponseCompletedTasks(); + public Map<String, Long> getSmallMessageCompletedTasks(); + + /** + * Dropped tasks for small message TCP Connections + */ + public Map<String, Long> getSmallMessageDroppedTasks(); /** * dropped message counts for server lifetime http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java index 6395aea..855763e 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java @@ -36,11 +36,14 @@ import org.apache.cassandra.utils.FBUtilities; public class OutboundTcpConnectionPool { + public static final long LARGE_MESSAGE_THRESHOLD = + Long.getLong(Config.PROPERTY_PREFIX + "OTCP_LARGE_MESSAGE_THRESHOLD", 1024 * 64); + // pointer for the real Address. private final InetAddress id; private final CountDownLatch started; - public final OutboundTcpConnection cmdCon; - public final OutboundTcpConnection ackCon; + public final OutboundTcpConnection smallMessages; + public final OutboundTcpConnection largeMessages; // pointer to the reset Address. private InetAddress resetEndpoint; private ConnectionMetrics metrics; @@ -51,8 +54,8 @@ public class OutboundTcpConnectionPool resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp); started = new CountDownLatch(1); - cmdCon = new OutboundTcpConnection(this); - ackCon = new OutboundTcpConnection(this); + smallMessages = new OutboundTcpConnection(this); + largeMessages = new OutboundTcpConnection(this); } /** @@ -61,21 +64,20 @@ public class OutboundTcpConnectionPool */ OutboundTcpConnection getConnection(MessageOut msg) { - Stage stage = msg.getStage(); - return stage == Stage.REQUEST_RESPONSE || stage == Stage.INTERNAL_RESPONSE || stage == Stage.GOSSIP - ? ackCon - : cmdCon; + return msg.payloadSize(smallMessages.getTargetVersion()) > LARGE_MESSAGE_THRESHOLD + ? largeMessages + : smallMessages; } void reset() { - for (OutboundTcpConnection conn : new OutboundTcpConnection[] { cmdCon, ackCon }) + for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages }) conn.closeSocket(false); } public void resetToNewerVersion(int version) { - for (OutboundTcpConnection conn : new OutboundTcpConnection[] { cmdCon, ackCon }) + for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages }) { if (version > conn.getTargetVersion()) conn.softCloseSocket(); @@ -91,7 +93,7 @@ public class OutboundTcpConnectionPool { SystemKeyspace.updatePreferredIP(id, remoteEP); resetEndpoint = remoteEP; - for (OutboundTcpConnection conn : new OutboundTcpConnection[] { cmdCon, ackCon }) + for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages }) conn.softCloseSocket(); // release previous metrics and create new one with reset address @@ -163,17 +165,17 @@ public class OutboundTcpConnectionPool } return true; } - + public void start() { - cmdCon.start(); - ackCon.start(); + smallMessages.start(); + largeMessages.start(); metrics = new ConnectionMetrics(id, this); - + started.countDown(); } - + public void waitForStarted() { if (started.getCount() == 0) @@ -197,11 +199,11 @@ public class OutboundTcpConnectionPool public void close() { // these null guards are simply for tests - if (ackCon != null) - ackCon.closeSocket(true); - if (cmdCon != null) - cmdCon.closeSocket(true); - + if (largeMessages != null) + largeMessages.closeSocket(true); + if (smallMessages != null) + smallMessages.closeSocket(true); + metrics.release(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 1fadd14..d9cb5d9 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -255,7 +255,7 @@ public class NodeTool try (NodeProbe probe = connect()) { execute(probe); - } + } catch (IOException e) { throw new RuntimeException("Error while closing JMX connection", e); @@ -510,20 +510,20 @@ public class NodeTool try { ownerships = probe.effectiveOwnership(keyspace); - } + } catch (IllegalStateException ex) { ownerships = probe.getOwnership(); errors.append("Note: " + ex.getMessage() + "%n"); showEffectiveOwnership = false; - } + } catch (IllegalArgumentException ex) { System.out.printf("%nError: " + ex.getMessage() + "%n"); return; } - + System.out.println(); for (Entry<String, SetHostStat> entry : getOwnershipByDc(probe, resolveIp, tokensToEndpoints, ownerships).entrySet()) printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue(),showEffectiveOwnership); @@ -673,20 +673,20 @@ public class NodeTool long completed; pending = 0; - for (int n : ms.getCommandPendingTasks().values()) + for (int n : ms.getLargeMessagePendingTasks().values()) pending += n; completed = 0; - for (long n : ms.getCommandCompletedTasks().values()) + for (long n : ms.getLargeMessageCompletedTasks().values()) completed += n; - System.out.printf("%-25s%10s%10s%15s%n", "Commands", "n/a", pending, completed); + System.out.printf("%-25s%10s%10s%15s%n", "Large messages", "n/a", pending, completed); pending = 0; - for (int n : ms.getResponsePendingTasks().values()) + for (int n : ms.getSmallMessagePendingTasks().values()) pending += n; completed = 0; - for (long n : ms.getResponseCompletedTasks().values()) + for (long n : ms.getSmallMessageCompletedTasks().values()) completed += n; - System.out.printf("%-25s%10s%10s%15s%n", "Responses", "n/a", pending, completed); + System.out.printf("%-25s%10s%10s%15s%n", "Small messages", "n/a", pending, completed); } } } @@ -2173,7 +2173,7 @@ public class NodeTool unreachableNodes = probe.getUnreachableNodes(); hostIDMap = probe.getHostIdMap(); epSnitchInfo = probe.getEndpointSnitchInfoProxy(); - + StringBuffer errors = new StringBuffer(); Map<InetAddress, Float> ownerships = null; @@ -2226,9 +2226,9 @@ public class NodeTool printNode(endpoint.getHostAddress(), owns, tokens, hasEffectiveOwns, isTokenPerNode); } } - + System.out.printf("%n" + errors.toString()); - + } private void findMaxAddressLength(Map<String, SetHostStat> dcs) @@ -2314,7 +2314,7 @@ public class NodeTool } } - private static Map<String, SetHostStat> getOwnershipByDc(NodeProbe probe, boolean resolveIp, + private static Map<String, SetHostStat> getOwnershipByDc(NodeProbe probe, boolean resolveIp, Map<String, String> tokenToEndpoint, Map<InetAddress, Float> ownerships) { @@ -2699,7 +2699,7 @@ public class NodeTool probe.truncateHints(endpoint); } } - + @Command(name = "setlogginglevel", description = "Set the log level threshold for a given class. If both class and level are empty/null, it will reset to the initial configuration") public static class SetLoggingLevel extends NodeToolCmd { @@ -2714,7 +2714,7 @@ public class NodeTool probe.setLoggingLevel(classQualifier, level); } } - + @Command(name = "getlogginglevels", description = "Get the runtime logging levels") public static class GetLoggingLevels extends NodeToolCmd { http://git-wip-us.apache.org/repos/asf/cassandra/blob/144644bb/src/java/org/apache/cassandra/utils/StatusLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java index a1d5e18..32470e8 100644 --- a/src/java/org/apache/cassandra/utils/StatusLogger.java +++ b/src/java/org/apache/cassandra/utils/StatusLogger.java @@ -64,18 +64,18 @@ public class StatusLogger // one offs logger.info(String.format("%-25s%10s%10s", "CompactionManager", CompactionManager.instance.getActiveCompactions(), CompactionManager.instance.getPendingTasks())); - int pendingCommands = 0; - for (int n : MessagingService.instance().getCommandPendingTasks().values()) + int pendingLargeMessages = 0; + for (int n : MessagingService.instance().getLargeMessagePendingTasks().values()) { - pendingCommands += n; + pendingLargeMessages += n; } - int pendingResponses = 0; - for (int n : MessagingService.instance().getResponsePendingTasks().values()) + int pendingSmallMessages = 0; + for (int n : MessagingService.instance().getSmallMessagePendingTasks().values()) { - pendingResponses += n; + pendingSmallMessages += n; } logger.info(String.format("%-25s%10s%10s", - "MessagingService", "n/a", pendingCommands + "/" + pendingResponses)); + "MessagingService", "n/a", pendingLargeMessages + "/" + pendingSmallMessages)); // Global key/row cache information AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = CacheService.instance.keyCache;