Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 d002c7edc -> 0e652e754 refs/heads/trunk 080816637 -> 477c54c03
GCInspector more closely tracks GC; cassandra-stress and nodetool report it patch by benedict; reviewed by tjake Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e652e75 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e652e75 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e652e75 Branch: refs/heads/cassandra-2.1 Commit: 0e652e7548edbfda2dbf47ce2272bb707a14a089 Parents: d002c7e Author: Benedict Elliott Smith <bened...@apache.org> Authored: Sun Sep 14 09:09:37 2014 +0100 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Sun Sep 14 09:09:37 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/service/GCInspector.java | 83 ++++++++++++- .../cassandra/service/GCInspectorMXBean.java | 25 ++++ .../org/apache/cassandra/tools/NodeProbe.java | 11 +- .../org/apache/cassandra/tools/NodeTool.java | 15 +++ .../apache/cassandra/stress/StressAction.java | 2 +- .../apache/cassandra/stress/StressMetrics.java | 61 ++++++++-- .../cassandra/stress/settings/SettingsPort.java | 3 + .../cassandra/stress/util/JmxCollector.java | 119 +++++++++++++++++++ .../apache/cassandra/stress/util/Timing.java | 36 ++++-- 10 files changed, 331 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6e029ab..4c39f5c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.1 + * GCInspector more closely tracks GC; cassandra-stress and nodetool report it * nodetool won't output bogus ownership info without a keyspace (CASSANDRA-7173) * Add human readable option to nodetool commands (CASSANDRA-5433) * Don't try to set repairedAt on old sstables (CASSANDRA-7913) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/src/java/org/apache/cassandra/service/GCInspector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java index d04b250..c4bffac 100644 --- a/src/java/org/apache/cassandra/service/GCInspector.java +++ b/src/java/org/apache/cassandra/service/GCInspector.java @@ -22,6 +22,8 @@ import java.lang.management.MemoryUsage; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.management.MBeanServer; import javax.management.Notification; import javax.management.NotificationListener; @@ -35,11 +37,55 @@ import com.sun.management.GarbageCollectionNotificationInfo; import org.apache.cassandra.io.sstable.SSTableDeletingTask; import org.apache.cassandra.utils.StatusLogger; -public class GCInspector implements NotificationListener +public class GCInspector implements NotificationListener, GCInspectorMXBean { + public static final String MBEAN_NAME = "org.apache.cassandra.service:type=GCInspector"; private static final Logger logger = LoggerFactory.getLogger(GCInspector.class); - final static long MIN_DURATION = 200; - final static long MIN_DURATION_TPSTATS = 1000; + final static long MIN_LOG_DURATION = 200; + final static long MIN_LOG_DURATION_TPSTATS = 1000; + + static final class State + { + final double maxRealTimeElapsed; + final double totalRealTimeElapsed; + final double sumSquaresRealTimeElapsed; + final double totalBytesReclaimed; + final double count; + final long startNanos; + + State(double extraElapsed, double extraBytes, State prev) + { + this.totalRealTimeElapsed = prev.totalRealTimeElapsed + extraElapsed; + this.totalBytesReclaimed = prev.totalBytesReclaimed + extraBytes; + this.sumSquaresRealTimeElapsed = prev.sumSquaresRealTimeElapsed + (extraElapsed * extraElapsed); + this.startNanos = prev.startNanos; + this.count = prev.count + 1; + this.maxRealTimeElapsed = Math.max(prev.maxRealTimeElapsed, extraElapsed); + } + + State() + { + count = maxRealTimeElapsed = sumSquaresRealTimeElapsed = totalRealTimeElapsed = totalBytesReclaimed = 0; + startNanos = System.nanoTime(); + } + } + + final AtomicReference<State> state = new AtomicReference<>(new State()); + + public GCInspector() + { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + + try + { + mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + } public static void register() throws Exception { @@ -66,6 +112,7 @@ public class GCInspector implements NotificationListener StringBuilder sb = new StringBuilder(); sb.append(info.getGcName()).append(" GC in ").append(duration).append("ms. "); + long bytes = 0; List<String> keys = new ArrayList<>(info.getGcInfo().getMemoryUsageBeforeGc().keySet()); Collections.sort(keys); for (String key : keys) @@ -79,16 +126,24 @@ public class GCInspector implements NotificationListener sb.append(after.getUsed()); if (!key.equals(keys.get(keys.size() - 1))) sb.append("; "); + bytes += before.getUsed() - after.getUsed(); } } + while (true) + { + State prev = state.get(); + if (state.compareAndSet(prev, new State(duration, bytes, prev))) + break; + } + String st = sb.toString(); - if (duration > MIN_DURATION) + if (duration > MIN_LOG_DURATION) logger.info(st); else if (logger.isDebugEnabled()) logger.debug(st); - if (duration > MIN_DURATION_TPSTATS) + if (duration > MIN_LOG_DURATION_TPSTATS) StatusLogger.log(); // if we just finished a full collection and we're still using a lot of memory, try to reduce the pressure @@ -96,4 +151,22 @@ public class GCInspector implements NotificationListener SSTableDeletingTask.rescheduleFailedTasks(); } } + + public State getTotalSinceLastCheck() + { + return state.getAndSet(new State()); + } + + public double[] getAndResetStats() + { + State state = getTotalSinceLastCheck(); + double[] r = new double[6]; + r[0] = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - state.startNanos); + r[1] = state.maxRealTimeElapsed; + r[2] = state.totalRealTimeElapsed; + r[3] = state.sumSquaresRealTimeElapsed; + r[4] = state.totalBytesReclaimed; + r[5] = state.count; + return r; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/src/java/org/apache/cassandra/service/GCInspectorMXBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/GCInspectorMXBean.java b/src/java/org/apache/cassandra/service/GCInspectorMXBean.java new file mode 100644 index 0000000..c26a67c --- /dev/null +++ b/src/java/org/apache/cassandra/service/GCInspectorMXBean.java @@ -0,0 +1,25 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.service; + +public interface GCInspectorMXBean +{ + // returns { interval (ms), max(gc real time (ms)), sum(gc real time (ms)), sum((gc real time (ms))^2), sum(gc bytes), count(gc) } + public double[] getAndResetStats(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 3f3073d..203730a 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -79,6 +79,7 @@ public class NodeProbe implements AutoCloseable private CompactionManagerMBean compactionProxy; private StorageServiceMBean ssProxy; private MemoryMXBean memProxy; + private GCInspectorMXBean gcProxy; private RuntimeMXBean runtimeProxy; private StreamManagerMBean streamProxy; public MessagingServiceMBean msProxy; @@ -169,7 +170,10 @@ public class NodeProbe implements AutoCloseable spProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageProxyMBean.class); name = new ObjectName(HintedHandOffManager.MBEAN_NAME); hhProxy = JMX.newMBeanProxy(mbeanServerConn, name, HintedHandOffManagerMBean.class); - } catch (MalformedObjectNameException e) + name = new ObjectName(GCInspector.MBEAN_NAME); + gcProxy = JMX.newMBeanProxy(mbeanServerConn, name, GCInspectorMXBean.class); + } + catch (MalformedObjectNameException e) { throw new RuntimeException( "Invalid ObjectName? Please report this as a bug.", e); @@ -374,6 +378,11 @@ public class NodeProbe implements AutoCloseable } } + public double[] getAndResetGCStats() + { + return gcProxy.getAndResetStats(); + } + public Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> getColumnFamilyStoreMBeanProxies() { try http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index e7d1404..731afcc 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -96,6 +96,7 @@ public class NodeTool DisableGossip.class, EnableHandoff.class, EnableThrift.class, + GcStats.class, GetCompactionThreshold.class, GetCompactionThroughput.class, GetStreamThroughput.class, @@ -2361,6 +2362,20 @@ public class NodeTool } } + @Command(name = "gcstats", description = "Print GC Statistics") + public static class GcStats extends NodeTool.NodeToolCmd + { + @Override + public void execute(NodeProbe probe) + { + double[] stats = probe.getAndResetGCStats(); + double mean = stats[2] / stats[5]; + double stdev = Math.sqrt((stats[3] / stats[5]) - (mean * mean)); + System.out.printf("%20s%20s%20s%20s%20s%n", "Interval (ms)", "Max GC Elapsed (ms)", "Total GC Elapsed (ms)", "Stdev GC Elapsed (ms)", "GC Reclaimed (MB)", "Collections"); + System.out.printf("%20.0d%20.0d%20.0d%20.0d%20.0d%n", stats[0], stats[1], stats[2], stdev, stats[4], stats[5]); + } + } + @Command(name = "truncatehints", description = "Truncate all hints on the local node, or truncate hints for the endpoint(s) specified.") public static class TruncateHints extends NodeToolCmd { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/tools/stress/src/org/apache/cassandra/stress/StressAction.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java index f697dd9..da32284 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java @@ -182,7 +182,7 @@ public class StressAction implements Runnable if (settings.rate.opRateTargetPerSecond > 0) rateLimiter = RateLimiter.create(settings.rate.opRateTargetPerSecond); - final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis); + final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis, settings); final CountDownLatch done = new CountDownLatch(threadCount); final Consumer[] consumers = new Consumer[threadCount]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java index dd3b867..9e8e961 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java @@ -23,12 +23,16 @@ package org.apache.cassandra.stress; import java.io.PrintStream; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.stress.settings.StressSettings; +import org.apache.cassandra.stress.util.JmxCollector; import org.apache.cassandra.stress.util.Timing; import org.apache.cassandra.stress.util.TimingInterval; import org.apache.cassandra.stress.util.Uncertainty; @@ -45,10 +49,33 @@ public class StressMetrics private final Uncertainty rowRateUncertainty = new Uncertainty(); private final CountDownLatch stopped = new CountDownLatch(1); private final Timing timing = new Timing(); + private final Callable<JmxCollector.GcStats> gcStatsCollector; + private volatile JmxCollector.GcStats totalGcStats; - public StressMetrics(PrintStream output, final long logIntervalMillis) + public StressMetrics(PrintStream output, final long logIntervalMillis, StressSettings settings) { this.output = output; + Callable<JmxCollector.GcStats> gcStatsCollector; + try + { + gcStatsCollector = new JmxCollector(settings.node.nodes, settings.port.jmxPort); + totalGcStats = new JmxCollector.GcStats(0); + } + catch (Throwable t) + { + t.printStackTrace(); + System.err.println("Failed to connect over JMX; not collecting these stats"); + totalGcStats = new JmxCollector.GcStats(Double.POSITIVE_INFINITY); + gcStatsCollector = new Callable<JmxCollector.GcStats>() + { + public JmxCollector.GcStats call() throws Exception + { + return totalGcStats; + } + }; + } + this.gcStatsCollector = gcStatsCollector; + printHeader("", output); thread = tf.newThread(new Runnable() { @@ -121,10 +148,10 @@ public class StressMetrics private void update() throws InterruptedException { - TimingInterval interval = timing.snapInterval(); - if (interval.partitionCount != 0) - printRow("", interval, timing.getHistory(), rowRateUncertainty, output); - rowRateUncertainty.update(interval.adjustedRowRate()); + Timing.TimingResult<JmxCollector.GcStats> result = timing.snap(gcStatsCollector); + if (result.timing.partitionCount != 0) + printRow("", result.timing, timing.getHistory(), result.extra, rowRateUncertainty, output); + rowRateUncertainty.update(result.timing.adjustedRowRate()); if (timing.done()) stop = true; } @@ -132,15 +159,15 @@ public class StressMetrics // PRINT FORMATTING - public static final String HEADFORMAT = "%-10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s"; - public static final String ROWFORMAT = "%-10d,%8.0f,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f"; + public static final String HEADFORMAT = "%-10s,%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%8s,%8s,%8s,%8s"; + public static final String ROWFORMAT = "%-10d,%10.0f,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f,%7.0f,%8.0f,%8.0f,%8.0f,%8.0f"; private static void printHeader(String prefix, PrintStream output) { - output.println(prefix + String.format(HEADFORMAT, "total ops","adj row/s","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr")); + output.println(prefix + String.format(HEADFORMAT, "total ops","adj row/s","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr", "gc: #", "max ms", "sum ms", "sdv ms", "mb")); } - private static void printRow(String prefix, TimingInterval interval, TimingInterval total, Uncertainty opRateUncertainty, PrintStream output) + private static void printRow(String prefix, TimingInterval interval, TimingInterval total, JmxCollector.GcStats gcStats, Uncertainty opRateUncertainty, PrintStream output) { output.println(prefix + String.format(ROWFORMAT, total.operationCount, @@ -155,7 +182,13 @@ public class StressMetrics interval.rankLatency(0.999f), interval.maxLatency(), total.runTime() / 1000f, - opRateUncertainty.getUncertainty())); + opRateUncertainty.getUncertainty(), + gcStats.count, + gcStats.maxms, + gcStats.summs, + gcStats.sdvms, + gcStats.bytes / (1 << 20) + )); } public void summarise() @@ -172,11 +205,16 @@ public class StressMetrics output.println(String.format("latency 99th percentile : %.1f", history.rankLatency(0.99f))); output.println(String.format("latency 99.9th percentile : %.1f", history.rankLatency(0.999f))); output.println(String.format("latency max : %.1f", history.maxLatency())); + output.println(String.format("total gc count : %.0f", totalGcStats.count)); + output.println(String.format("total gc mb : %.0f", totalGcStats.bytes / (1 << 20))); + output.println(String.format("total gc time (s) : %.0f", totalGcStats.summs / 1000)); + output.println(String.format("avg gc time(ms) : %.0f", totalGcStats.summs / totalGcStats.count)); + output.println(String.format("stdev gc time(ms) : %.0f", totalGcStats.sdvms)); output.println("Total operation time : " + DurationFormatUtils.formatDuration( history.runTime(), "HH:mm:ss", true)); } - public static final void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out) + public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out) { int idLen = 0; for (String id : ids) @@ -187,6 +225,7 @@ public class StressMetrics printRow(String.format(formatstr, ids.get(i)), summarise.get(i).timing.getHistory(), summarise.get(i).timing.getHistory(), + summarise.get(i).totalGcStats, summarise.get(i).rowRateUncertainty, out ); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java index 6f12f99..1e10e37 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java @@ -31,11 +31,13 @@ public class SettingsPort implements Serializable public final int nativePort; public final int thriftPort; + public final int jmxPort; public SettingsPort(PortOptions options) { nativePort = Integer.parseInt(options.nativePort.value()); thriftPort = Integer.parseInt(options.thriftPort.value()); + jmxPort = Integer.parseInt(options.jmxPort.value()); } // Option Declarations @@ -44,6 +46,7 @@ public class SettingsPort implements Serializable { final OptionSimple nativePort = new OptionSimple("native=", "[0-9]+", "9042", "Use this port for the Cassandra native protocol", false); final OptionSimple thriftPort = new OptionSimple("thrift=", "[0-9]+", "9160", "Use this port for the thrift protocol", false); + final OptionSimple jmxPort = new OptionSimple("jmx=", "[0-9]+", "7199", "Use this port for retrieving statistics over jmx", false); @Override public List<? extends Option> options() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/tools/stress/src/org/apache/cassandra/stress/util/JmxCollector.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JmxCollector.java b/tools/stress/src/org/apache/cassandra/stress/util/JmxCollector.java new file mode 100644 index 0000000..9611b2a --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/util/JmxCollector.java @@ -0,0 +1,119 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.stress.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.tools.NodeProbe; + +public class JmxCollector implements Callable<JmxCollector.GcStats> +{ + + public static class GcStats + { + public final double count; + public final double bytes; + public final double maxms; + public final double summs; + public final double sumsqms; + public final double sdvms; + public GcStats(double count, double bytes, double maxms, double summs, double sumsqms) + { + this.count = count; + this.bytes = bytes; + this.maxms = maxms; + this.summs = summs; + this.sumsqms = sumsqms; + double mean = summs / count; + double stdev = Math.sqrt((sumsqms / count) - (mean * mean)); + if (Double.isNaN(stdev)) + stdev = 0; + this.sdvms = stdev; + } + public GcStats(double fill) + { + this(fill, fill, fill, fill, fill); + } + public static GcStats aggregate(List<GcStats> stats) + { + double count = 0, bytes = 0, maxms = 0, summs = 0, sumsqms = 0; + for (GcStats stat : stats) + { + count += stat.count; + bytes += stat.bytes; + maxms += stat.maxms; + summs += stat.summs; + sumsqms += stat.sumsqms; + } + return new GcStats(count, bytes, maxms, summs, sumsqms); + } + } + + final NodeProbe[] probes; + + // TODO: should expand to whole cluster + public JmxCollector(List<String> hosts, int port) + { + probes = new NodeProbe[hosts.size()]; + for (int i = 0 ; i < hosts.size() ; i++) + probes[i] = connect(hosts.get(i), port); + } + + private static NodeProbe connect(String host, int port) + { + try + { + return new NodeProbe(host, port); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public GcStats call() throws Exception + { + final List<Future<GcStats>> futures = new ArrayList<>(); + for (final NodeProbe probe : probes) + { + futures.add(TPE.submit(new Callable<GcStats>() + { + public GcStats call() throws Exception + { + final double[] stats = probe.getAndResetGCStats(); + return new GcStats(stats[5], stats[4], stats[1], stats[2], stats[3]); + } + })); + } + + List<GcStats> results = new ArrayList<>(); + for (Future<GcStats> future : futures) + results.add(future.get()); + return GcStats.aggregate(results); + } + + private static final ExecutorService TPE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("JmxCollector")); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e652e75/tools/stress/src/org/apache/cassandra/stress/util/Timing.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java index b6d4e52..f64a40b 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -44,7 +45,18 @@ public class Timing // TIMING - private TimingInterval snapInterval(Random rnd) throws InterruptedException + public static class TimingResult<E> + { + public final E extra; + public final TimingInterval timing; + public TimingResult(E extra, TimingInterval timing) + { + this.extra = extra; + this.timing = timing; + } + } + + private <E> TimingResult<E> snap(Random rnd, Callable<E> call) throws InterruptedException { final Timer[] timers = this.timers.toArray(new Timer[0]); final CountDownLatch ready = new CountDownLatch(timers.length); @@ -54,8 +66,18 @@ public class Timing timer.requestReport(ready); } + E extra; + try + { + extra = call.call(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + // TODO fail gracefully after timeout if a thread is stuck - if (!ready.await(2L, TimeUnit.MINUTES)) + if (!ready.await(5L, TimeUnit.MINUTES)) throw new RuntimeException("Timed out waiting for a timer thread - seems one got stuck"); boolean done = true; @@ -68,7 +90,7 @@ public class Timing } this.done = done; - return TimingInterval.merge(rnd, intervals, Integer.MAX_VALUE, history.endNanos()); + return new TimingResult<>(extra, TimingInterval.merge(rnd, intervals, Integer.MAX_VALUE, history.endNanos())); } // build a new timer and add it to the set of running timers @@ -89,11 +111,11 @@ public class Timing return done; } - public TimingInterval snapInterval() throws InterruptedException + public <E> TimingResult<E> snap(Callable<E> call) throws InterruptedException { - final TimingInterval interval = snapInterval(rnd); - history = TimingInterval.merge(rnd, Arrays.asList(interval, history), 200000, history.startNanos()); - return interval; + final TimingResult<E> result = snap(rnd, call); + history = TimingInterval.merge(rnd, Arrays.asList(result.timing, history), 200000, history.startNanos()); + return result; } public TimingInterval getHistory()