Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 d002c7edc -> 0e652e754
  refs/heads/trunk 080816637 -> 477c54c03


GCInspector more closely tracks GC; cassandra-stress and nodetool report it

patch by benedict; reviewed by tjake


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e652e75
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e652e75
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e652e75

Branch: refs/heads/cassandra-2.1
Commit: 0e652e7548edbfda2dbf47ce2272bb707a14a089
Parents: d002c7e
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Sun Sep 14 09:09:37 2014 +0100
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Sun Sep 14 09:09:37 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/service/GCInspector.java   |  83 ++++++++++++-
 .../cassandra/service/GCInspectorMXBean.java    |  25 ++++
 .../org/apache/cassandra/tools/NodeProbe.java   |  11 +-
 .../org/apache/cassandra/tools/NodeTool.java    |  15 +++
 .../apache/cassandra/stress/StressAction.java   |   2 +-
 .../apache/cassandra/stress/StressMetrics.java  |  61 ++++++++--
 .../cassandra/stress/settings/SettingsPort.java |   3 +
 .../cassandra/stress/util/JmxCollector.java     | 119 +++++++++++++++++++
 .../apache/cassandra/stress/util/Timing.java    |  36 ++++--
 10 files changed, 331 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e029ab..4c39f5c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * GCInspector more closely tracks GC; cassandra-stress and nodetool report it
  * nodetool won't output bogus ownership info without a keyspace 
(CASSANDRA-7173)
  * Add human readable option to nodetool commands (CASSANDRA-5433)
  * Don't try to set repairedAt on old sstables (CASSANDRA-7913)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/src/java/org/apache/cassandra/service/GCInspector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/GCInspector.java 
b/src/java/org/apache/cassandra/service/GCInspector.java
index d04b250..c4bffac 100644
--- a/src/java/org/apache/cassandra/service/GCInspector.java
+++ b/src/java/org/apache/cassandra/service/GCInspector.java
@@ -22,6 +22,8 @@ import java.lang.management.MemoryUsage;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import javax.management.MBeanServer;
 import javax.management.Notification;
 import javax.management.NotificationListener;
@@ -35,11 +37,55 @@ import com.sun.management.GarbageCollectionNotificationInfo;
 import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.utils.StatusLogger;
 
-public class GCInspector implements NotificationListener
+public class GCInspector implements NotificationListener, GCInspectorMXBean
 {
+    public static final String MBEAN_NAME = 
"org.apache.cassandra.service:type=GCInspector";
     private static final Logger logger = 
LoggerFactory.getLogger(GCInspector.class);
-    final static long MIN_DURATION = 200;
-    final static long MIN_DURATION_TPSTATS = 1000;
+    final static long MIN_LOG_DURATION = 200;
+    final static long MIN_LOG_DURATION_TPSTATS = 1000;
+
+    static final class State
+    {
+        final double maxRealTimeElapsed;
+        final double totalRealTimeElapsed;
+        final double sumSquaresRealTimeElapsed;
+        final double totalBytesReclaimed;
+        final double count;
+        final long startNanos;
+
+        State(double extraElapsed, double extraBytes, State prev)
+        {
+            this.totalRealTimeElapsed = prev.totalRealTimeElapsed + 
extraElapsed;
+            this.totalBytesReclaimed = prev.totalBytesReclaimed + extraBytes;
+            this.sumSquaresRealTimeElapsed = prev.sumSquaresRealTimeElapsed + 
(extraElapsed * extraElapsed);
+            this.startNanos = prev.startNanos;
+            this.count = prev.count + 1;
+            this.maxRealTimeElapsed = Math.max(prev.maxRealTimeElapsed, 
extraElapsed);
+        }
+
+        State()
+        {
+            count = maxRealTimeElapsed = sumSquaresRealTimeElapsed = 
totalRealTimeElapsed = totalBytesReclaimed = 0;
+            startNanos = System.nanoTime();
+        }
+    }
+
+    final AtomicReference<State> state = new AtomicReference<>(new State());
+
+    public GCInspector()
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
+        try
+        {
+            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+
+    }
 
     public static void register() throws Exception
     {
@@ -66,6 +112,7 @@ public class GCInspector implements NotificationListener
             StringBuilder sb = new StringBuilder();
             sb.append(info.getGcName()).append(" GC in 
").append(duration).append("ms.  ");
 
+            long bytes = 0;
             List<String> keys = new 
ArrayList<>(info.getGcInfo().getMemoryUsageBeforeGc().keySet());
             Collections.sort(keys);
             for (String key : keys)
@@ -79,16 +126,24 @@ public class GCInspector implements NotificationListener
                     sb.append(after.getUsed());
                     if (!key.equals(keys.get(keys.size() - 1)))
                         sb.append("; ");
+                    bytes += before.getUsed() - after.getUsed();
                 }
             }
 
+            while (true)
+            {
+                State prev = state.get();
+                if (state.compareAndSet(prev, new State(duration, bytes, 
prev)))
+                    break;
+            }
+
             String st = sb.toString();
-            if (duration > MIN_DURATION)
+            if (duration > MIN_LOG_DURATION)
                 logger.info(st);
             else if (logger.isDebugEnabled())
                 logger.debug(st);
 
-            if (duration > MIN_DURATION_TPSTATS)
+            if (duration > MIN_LOG_DURATION_TPSTATS)
                 StatusLogger.log();
 
             // if we just finished a full collection and we're still using a 
lot of memory, try to reduce the pressure
@@ -96,4 +151,22 @@ public class GCInspector implements NotificationListener
                 SSTableDeletingTask.rescheduleFailedTasks();
         }
     }
+
+    public State getTotalSinceLastCheck()
+    {
+        return state.getAndSet(new State());
+    }
+
+    public double[] getAndResetStats()
+    {
+        State state = getTotalSinceLastCheck();
+        double[] r = new double[6];
+        r[0] = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
state.startNanos);
+        r[1] = state.maxRealTimeElapsed;
+        r[2] = state.totalRealTimeElapsed;
+        r[3] = state.sumSquaresRealTimeElapsed;
+        r[4] = state.totalBytesReclaimed;
+        r[5] = state.count;
+        return r;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/src/java/org/apache/cassandra/service/GCInspectorMXBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/GCInspectorMXBean.java 
b/src/java/org/apache/cassandra/service/GCInspectorMXBean.java
new file mode 100644
index 0000000..c26a67c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/GCInspectorMXBean.java
@@ -0,0 +1,25 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.service;
+
+public interface GCInspectorMXBean
+{
+    // returns { interval (ms), max(gc real time (ms)), sum(gc real time 
(ms)), sum((gc real time (ms))^2), sum(gc bytes), count(gc) }
+    public double[] getAndResetStats();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 3f3073d..203730a 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -79,6 +79,7 @@ public class NodeProbe implements AutoCloseable
     private CompactionManagerMBean compactionProxy;
     private StorageServiceMBean ssProxy;
     private MemoryMXBean memProxy;
+    private GCInspectorMXBean gcProxy;
     private RuntimeMXBean runtimeProxy;
     private StreamManagerMBean streamProxy;
     public MessagingServiceMBean msProxy;
@@ -169,7 +170,10 @@ public class NodeProbe implements AutoCloseable
             spProxy = JMX.newMBeanProxy(mbeanServerConn, name, 
StorageProxyMBean.class);
             name = new ObjectName(HintedHandOffManager.MBEAN_NAME);
             hhProxy = JMX.newMBeanProxy(mbeanServerConn, name, 
HintedHandOffManagerMBean.class);
-        } catch (MalformedObjectNameException e)
+            name = new ObjectName(GCInspector.MBEAN_NAME);
+            gcProxy = JMX.newMBeanProxy(mbeanServerConn, name, 
GCInspectorMXBean.class);
+        }
+        catch (MalformedObjectNameException e)
         {
             throw new RuntimeException(
                     "Invalid ObjectName? Please report this as a bug.", e);
@@ -374,6 +378,11 @@ public class NodeProbe implements AutoCloseable
         }
     }
 
+    public double[] getAndResetGCStats()
+    {
+        return gcProxy.getAndResetStats();
+    }
+
     public Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> 
getColumnFamilyStoreMBeanProxies()
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index e7d1404..731afcc 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -96,6 +96,7 @@ public class NodeTool
                 DisableGossip.class,
                 EnableHandoff.class,
                 EnableThrift.class,
+                GcStats.class,
                 GetCompactionThreshold.class,
                 GetCompactionThroughput.class,
                 GetStreamThroughput.class,
@@ -2361,6 +2362,20 @@ public class NodeTool
         }
     }
 
+    @Command(name = "gcstats", description = "Print GC Statistics")
+    public static class GcStats extends NodeTool.NodeToolCmd
+    {
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            double[] stats = probe.getAndResetGCStats();
+            double mean = stats[2] / stats[5];
+            double stdev = Math.sqrt((stats[3] / stats[5]) - (mean * mean));
+            System.out.printf("%20s%20s%20s%20s%20s%n", "Interval (ms)", "Max 
GC Elapsed (ms)", "Total GC Elapsed (ms)", "Stdev GC Elapsed (ms)", "GC 
Reclaimed (MB)", "Collections");
+            System.out.printf("%20.0d%20.0d%20.0d%20.0d%20.0d%n", stats[0], 
stats[1], stats[2], stdev, stats[4], stats[5]);
+        }
+    }
+
     @Command(name = "truncatehints", description = "Truncate all hints on the 
local node, or truncate hints for the endpoint(s) specified.")
     public static class TruncateHints extends NodeToolCmd
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java 
b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index f697dd9..da32284 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -182,7 +182,7 @@ public class StressAction implements Runnable
         if (settings.rate.opRateTargetPerSecond > 0)
             rateLimiter = 
RateLimiter.create(settings.rate.opRateTargetPerSecond);
 
-        final StressMetrics metrics = new StressMetrics(output, 
settings.log.intervalMillis);
+        final StressMetrics metrics = new StressMetrics(output, 
settings.log.intervalMillis, settings);
 
         final CountDownLatch done = new CountDownLatch(threadCount);
         final Consumer[] consumers = new Consumer[threadCount];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java 
b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index dd3b867..9e8e961 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -23,12 +23,16 @@ package org.apache.cassandra.stress;
 
 import java.io.PrintStream;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.time.DurationFormatUtils;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JmxCollector;
 import org.apache.cassandra.stress.util.Timing;
 import org.apache.cassandra.stress.util.TimingInterval;
 import org.apache.cassandra.stress.util.Uncertainty;
@@ -45,10 +49,33 @@ public class StressMetrics
     private final Uncertainty rowRateUncertainty = new Uncertainty();
     private final CountDownLatch stopped = new CountDownLatch(1);
     private final Timing timing = new Timing();
+    private final Callable<JmxCollector.GcStats> gcStatsCollector;
+    private volatile JmxCollector.GcStats totalGcStats;
 
-    public StressMetrics(PrintStream output, final long logIntervalMillis)
+    public StressMetrics(PrintStream output, final long logIntervalMillis, 
StressSettings settings)
     {
         this.output = output;
+        Callable<JmxCollector.GcStats> gcStatsCollector;
+        try
+        {
+            gcStatsCollector = new JmxCollector(settings.node.nodes, 
settings.port.jmxPort);
+            totalGcStats = new JmxCollector.GcStats(0);
+        }
+        catch (Throwable t)
+        {
+            t.printStackTrace();
+            System.err.println("Failed to connect over JMX; not collecting 
these stats");
+            totalGcStats = new JmxCollector.GcStats(Double.POSITIVE_INFINITY);
+            gcStatsCollector = new Callable<JmxCollector.GcStats>()
+            {
+                public JmxCollector.GcStats call() throws Exception
+                {
+                    return totalGcStats;
+                }
+            };
+        }
+        this.gcStatsCollector = gcStatsCollector;
+
         printHeader("", output);
         thread = tf.newThread(new Runnable()
         {
@@ -121,10 +148,10 @@ public class StressMetrics
 
     private void update() throws InterruptedException
     {
-        TimingInterval interval = timing.snapInterval();
-        if (interval.partitionCount != 0)
-            printRow("", interval, timing.getHistory(), rowRateUncertainty, 
output);
-        rowRateUncertainty.update(interval.adjustedRowRate());
+        Timing.TimingResult<JmxCollector.GcStats> result = 
timing.snap(gcStatsCollector);
+        if (result.timing.partitionCount != 0)
+            printRow("", result.timing, timing.getHistory(), result.extra, 
rowRateUncertainty, output);
+        rowRateUncertainty.update(result.timing.adjustedRowRate());
         if (timing.done())
             stop = true;
     }
@@ -132,15 +159,15 @@ public class StressMetrics
 
     // PRINT FORMATTING
 
-    public static final String HEADFORMAT = 
"%-10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s";
-    public static final String ROWFORMAT =  
"%-10d,%8.0f,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f";
+    public static final String HEADFORMAT = 
"%-10s,%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%8s,%8s,%8s,%8s";
+    public static final String ROWFORMAT =  
"%-10d,%10.0f,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f,%7.0f,%8.0f,%8.0f,%8.0f,%8.0f";
 
     private static void printHeader(String prefix, PrintStream output)
     {
-        output.println(prefix + String.format(HEADFORMAT, "total ops","adj 
row/s","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr"));
+        output.println(prefix + String.format(HEADFORMAT, "total ops","adj 
row/s","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr",
 "gc: #", "max ms", "sum ms", "sdv ms", "mb"));
     }
 
-    private static void printRow(String prefix, TimingInterval interval, 
TimingInterval total, Uncertainty opRateUncertainty, PrintStream output)
+    private static void printRow(String prefix, TimingInterval interval, 
TimingInterval total, JmxCollector.GcStats gcStats, Uncertainty 
opRateUncertainty, PrintStream output)
     {
         output.println(prefix + String.format(ROWFORMAT,
                 total.operationCount,
@@ -155,7 +182,13 @@ public class StressMetrics
                 interval.rankLatency(0.999f),
                 interval.maxLatency(),
                 total.runTime() / 1000f,
-                opRateUncertainty.getUncertainty()));
+                opRateUncertainty.getUncertainty(),
+                gcStats.count,
+                gcStats.maxms,
+                gcStats.summs,
+                gcStats.sdvms,
+                gcStats.bytes / (1 << 20)
+        ));
     }
 
     public void summarise()
@@ -172,11 +205,16 @@ public class StressMetrics
         output.println(String.format("latency 99th percentile   : %.1f", 
history.rankLatency(0.99f)));
         output.println(String.format("latency 99.9th percentile : %.1f", 
history.rankLatency(0.999f)));
         output.println(String.format("latency max               : %.1f", 
history.maxLatency()));
+        output.println(String.format("total gc count            : %.0f", 
totalGcStats.count));
+        output.println(String.format("total gc mb               : %.0f", 
totalGcStats.bytes / (1 << 20)));
+        output.println(String.format("total gc time (s)         : %.0f", 
totalGcStats.summs / 1000));
+        output.println(String.format("avg gc time(ms)           : %.0f", 
totalGcStats.summs / totalGcStats.count));
+        output.println(String.format("stdev gc time(ms)         : %.0f", 
totalGcStats.sdvms));
         output.println("Total operation time      : " + 
DurationFormatUtils.formatDuration(
                 history.runTime(), "HH:mm:ss", true));
     }
 
-    public static final void summarise(List<String> ids, List<StressMetrics> 
summarise, PrintStream out)
+    public static void summarise(List<String> ids, List<StressMetrics> 
summarise, PrintStream out)
     {
         int idLen = 0;
         for (String id : ids)
@@ -187,6 +225,7 @@ public class StressMetrics
             printRow(String.format(formatstr, ids.get(i)),
                     summarise.get(i).timing.getHistory(),
                     summarise.get(i).timing.getHistory(),
+                    summarise.get(i).totalGcStats,
                     summarise.get(i).rowRateUncertainty,
                     out
             );

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java 
b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java
index 6f12f99..1e10e37 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java
@@ -31,11 +31,13 @@ public class SettingsPort implements Serializable
 
     public final int nativePort;
     public final int thriftPort;
+    public final int jmxPort;
 
     public SettingsPort(PortOptions options)
     {
         nativePort = Integer.parseInt(options.nativePort.value());
         thriftPort = Integer.parseInt(options.thriftPort.value());
+        jmxPort = Integer.parseInt(options.jmxPort.value());
     }
 
     // Option Declarations
@@ -44,6 +46,7 @@ public class SettingsPort implements Serializable
     {
         final OptionSimple nativePort = new OptionSimple("native=", "[0-9]+", 
"9042", "Use this port for the Cassandra native protocol", false);
         final OptionSimple thriftPort = new OptionSimple("thrift=", "[0-9]+", 
"9160", "Use this port for the thrift protocol", false);
+        final OptionSimple jmxPort = new OptionSimple("jmx=", "[0-9]+", 
"7199", "Use this port for retrieving statistics over jmx", false);
 
         @Override
         public List<? extends Option> options()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/tools/stress/src/org/apache/cassandra/stress/util/JmxCollector.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/util/JmxCollector.java 
b/tools/stress/src/org/apache/cassandra/stress/util/JmxCollector.java
new file mode 100644
index 0000000..9611b2a
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/JmxCollector.java
@@ -0,0 +1,119 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.stress.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.tools.NodeProbe;
+
+public class JmxCollector implements Callable<JmxCollector.GcStats>
+{
+
+    public static class GcStats
+    {
+        public final double count;
+        public final double bytes;
+        public final double maxms;
+        public final double summs;
+        public final double sumsqms;
+        public final double sdvms;
+        public GcStats(double count, double bytes, double maxms, double summs, 
double sumsqms)
+        {
+            this.count = count;
+            this.bytes = bytes;
+            this.maxms = maxms;
+            this.summs = summs;
+            this.sumsqms = sumsqms;
+            double mean = summs / count;
+            double stdev = Math.sqrt((sumsqms / count) - (mean * mean));
+            if (Double.isNaN(stdev))
+                stdev = 0;
+            this.sdvms = stdev;
+        }
+        public GcStats(double fill)
+        {
+            this(fill, fill, fill, fill, fill);
+        }
+        public static GcStats aggregate(List<GcStats> stats)
+        {
+            double count = 0, bytes = 0, maxms = 0, summs = 0, sumsqms = 0;
+            for (GcStats stat : stats)
+            {
+                count += stat.count;
+                bytes += stat.bytes;
+                maxms += stat.maxms;
+                summs += stat.summs;
+                sumsqms += stat.sumsqms;
+            }
+            return new GcStats(count, bytes, maxms, summs, sumsqms);
+        }
+    }
+
+    final NodeProbe[] probes;
+
+    // TODO: should expand to whole cluster
+    public JmxCollector(List<String> hosts, int port)
+    {
+        probes = new NodeProbe[hosts.size()];
+        for (int i = 0 ; i < hosts.size() ; i++)
+            probes[i] = connect(hosts.get(i), port);
+    }
+
+    private static NodeProbe connect(String host, int port)
+    {
+        try
+        {
+            return new NodeProbe(host, port);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public GcStats call() throws Exception
+    {
+        final List<Future<GcStats>> futures = new ArrayList<>();
+        for (final NodeProbe probe : probes)
+        {
+            futures.add(TPE.submit(new Callable<GcStats>()
+            {
+                public GcStats call() throws Exception
+                {
+                    final double[] stats = probe.getAndResetGCStats();
+                    return new GcStats(stats[5], stats[4], stats[1], stats[2], 
stats[3]);
+                }
+            }));
+        }
+
+        List<GcStats> results = new ArrayList<>();
+        for (Future<GcStats> future : futures)
+            results.add(future.get());
+        return GcStats.aggregate(results);
+    }
+
+    private static final ExecutorService TPE = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new 
NamedThreadFactory("JmxCollector"));
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java 
b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
index b6d4e52..f64a40b 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -44,7 +45,18 @@ public class Timing
 
     // TIMING
 
-    private TimingInterval snapInterval(Random rnd) throws InterruptedException
+    public static class TimingResult<E>
+    {
+        public final E extra;
+        public final TimingInterval timing;
+        public TimingResult(E extra, TimingInterval timing)
+        {
+            this.extra = extra;
+            this.timing = timing;
+        }
+    }
+
+    private <E> TimingResult<E> snap(Random rnd, Callable<E> call) throws 
InterruptedException
     {
         final Timer[] timers = this.timers.toArray(new Timer[0]);
         final CountDownLatch ready = new CountDownLatch(timers.length);
@@ -54,8 +66,18 @@ public class Timing
             timer.requestReport(ready);
         }
 
+        E extra;
+        try
+        {
+            extra = call.call();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+
         // TODO fail gracefully after timeout if a thread is stuck
-        if (!ready.await(2L, TimeUnit.MINUTES))
+        if (!ready.await(5L, TimeUnit.MINUTES))
             throw new RuntimeException("Timed out waiting for a timer thread - 
seems one got stuck");
 
         boolean done = true;
@@ -68,7 +90,7 @@ public class Timing
         }
 
         this.done = done;
-        return TimingInterval.merge(rnd, intervals, Integer.MAX_VALUE, 
history.endNanos());
+        return new TimingResult<>(extra, TimingInterval.merge(rnd, intervals, 
Integer.MAX_VALUE, history.endNanos()));
     }
 
     // build a new timer and add it to the set of running timers
@@ -89,11 +111,11 @@ public class Timing
         return done;
     }
 
-    public TimingInterval snapInterval() throws InterruptedException
+    public <E> TimingResult<E> snap(Callable<E> call) throws 
InterruptedException
     {
-        final TimingInterval interval = snapInterval(rnd);
-        history = TimingInterval.merge(rnd, Arrays.asList(interval, history), 
200000, history.startNanos());
-        return interval;
+        final TimingResult<E> result = snap(rnd, call);
+        history = TimingInterval.merge(rnd, Arrays.asList(result.timing, 
history), 200000, history.startNanos());
+        return result;
     }
 
     public TimingInterval getHistory()

Reply via email to