Repository: cassandra Updated Branches: refs/heads/trunk dd650c8e8 -> 45f250535
Allow noarg toppartitions Closes #214 Patch by Chris Lohfink; Reviewed by Dinesh Joshi for CASSANDRA-14360 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45f25053 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45f25053 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45f25053 Branch: refs/heads/trunk Commit: 45f250535a5d26011ce7d71c9c09b31d758bfd7b Parents: dd650c8 Author: Chris Lohfink <clohf...@apple.com> Authored: Wed May 2 00:52:16 2018 -0700 Committer: Jeff Jirsa <jji...@apple.com> Committed: Sat May 12 11:19:22 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/StorageService.java | 38 ++++++++ .../cassandra/service/StorageServiceMBean.java | 5 + .../org/apache/cassandra/tools/NodeProbe.java | 21 +++-- .../cassandra/tools/nodetool/TopPartitions.java | 98 +++++++++++++------- .../cassandra/tools/TopPartitionsTest.java | 67 +++++++++++++ 6 files changed, 187 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/45f25053/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index aa4ef39..01c67f4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Allow nodetool toppartitions without specifying table (CASSANDRA-14360) * Audit logging for database activity (CASSANDRA-12151) * Clean up build artifacts in docs container (CASSANDRA-14432) * Minor network authz improvements (Cassandra-14413) http://git-wip-us.apache.org/repos/asf/cassandra/blob/45f25053/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index dd2f178..a62af6f 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -33,9 +33,13 @@ import java.util.stream.StreamSupport; import javax.annotation.Nullable; import javax.management.*; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; +import com.clearspring.analytics.stream.Counter; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -76,6 +80,7 @@ import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.*; import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.metrics.TableMetrics.Sampler; import org.apache.cassandra.net.*; import org.apache.cassandra.repair.*; import org.apache.cassandra.repair.messages.RepairOption; @@ -99,6 +104,7 @@ import org.apache.cassandra.streaming.*; import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.TopKSampler.SamplerResult; import org.apache.cassandra.utils.logging.LoggingSupportFactory; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventType; @@ -5269,6 +5275,38 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return sampledKeys; } + /* + * little hard to parse for JMX MBean requirements, but the output looks something like: + * + * {"keyspace.table": + * {"SAMPLER": [{cardinality:i partitions: [{raw:"", string:"", count:i, error:i}, ...]}, ...]} + * } + */ + @Override + public Map<String, Map<String, CompositeData>> samplePartitions(long duration, int capacity, int count, List<String> samplers) throws OpenDataException + { + for (String sampler : samplers) + { + for (ColumnFamilyStore table : ColumnFamilyStore.all()) + { + table.beginLocalSampling(sampler, capacity); + } + } + + Uninterruptibles.sleepUninterruptibly(duration, TimeUnit.MILLISECONDS); + ConcurrentHashMap<String, Map<String, CompositeData>> result = new ConcurrentHashMap<>(); + for (String sampler : samplers) + { + for (ColumnFamilyStore table : ColumnFamilyStore.all()) + { + String name = table.keyspace.getName() + "." + table.name; + Map<String, CompositeData> topk = result.computeIfAbsent(name, x -> new HashMap<>()); + topk.put(sampler, table.finishLocalSampling(sampler, count)); + } + } + return result; + } + public void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames) { String[] indices = asList(idxNames).stream() http://git-wip-us.apache.org/repos/asf/cassandra/blob/45f25053/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 8c4b618..1282105 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -27,9 +27,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import javax.management.NotificationEmitter; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.metrics.TableMetrics.Sampler; public interface StorageServiceMBean extends NotificationEmitter { @@ -628,6 +631,8 @@ public interface StorageServiceMBean extends NotificationEmitter */ public void setTraceProbability(double probability); + public Map<String, Map<String, CompositeData>> samplePartitions(long duration, int capacity, int count, List<String> samplers) throws OpenDataException; + /** * Returns the configured tracing probability. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/45f25053/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index f556ffc..7cec99d 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -89,6 +89,7 @@ import org.apache.cassandra.streaming.management.StreamStateCompositeData; import com.google.common.base.Function; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; @@ -418,21 +419,27 @@ public class NodeProbe implements AutoCloseable } } } + public Map<String, Map<String, CompositeData>> getPartitionSample(int capacity, int duration, int count, List<String> samplers) throws OpenDataException + { + return ssProxy.samplePartitions(duration, capacity, count, samplers); + } - public Map<Sampler, CompositeData> getPartitionSample(String ks, String cf, int capacity, int duration, int count, List<Sampler> samplers) throws OpenDataException + public Map<String, Map<String, CompositeData>> getPartitionSample(String ks, String cf, int capacity, int duration, int count, List<String> samplers) throws OpenDataException { ColumnFamilyStoreMBean cfsProxy = getCfsProxy(ks, cf); - for(Sampler sampler : samplers) + for(String sampler : samplers) { - cfsProxy.beginLocalSampling(sampler.name(), capacity); + cfsProxy.beginLocalSampling(sampler, capacity); } Uninterruptibles.sleepUninterruptibly(duration, TimeUnit.MILLISECONDS); - Map<Sampler, CompositeData> result = Maps.newHashMap(); - for(Sampler sampler : samplers) + Map<String, CompositeData> result = Maps.newHashMap(); + for(String sampler : samplers) { - result.put(sampler, cfsProxy.finishLocalSampling(sampler.name(), count)); + result.put(sampler, cfsProxy.finishLocalSampling(sampler, count)); } - return result; + return new ImmutableMap.Builder<String, Map<String, CompositeData>>() + .put(ks + "." + cf, result) + .build(); } public void invalidateCounterCache() http://git-wip-us.apache.org/repos/asf/cassandra/blob/45f25053/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java b/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java index 73bf2fb..ee03cd2 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java +++ b/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java @@ -19,9 +19,6 @@ package org.apache.cassandra.tools.nodetool; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.commons.lang3.StringUtils.join; -import io.airlift.airline.Arguments; -import io.airlift.airline.Command; -import io.airlift.airline.Option; import java.util.ArrayList; import java.util.Collections; @@ -37,14 +34,20 @@ import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.metrics.TableMetrics.Sampler; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; +import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; +import org.apache.cassandra.utils.Pair; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; -@Command(name = "toppartitions", description = "Sample and print the most active partitions for a given column family") +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; +import io.airlift.airline.Option; + +@Command(name = "toppartitions", description = "Sample and print the most active partitions") public class TopPartitions extends NodeToolCmd { - @Arguments(usage = "<keyspace> <cfname> <duration>", description = "The keyspace, column family name, and duration in milliseconds") + @Arguments(usage = "[keyspace table] [duration]", description = "The keyspace, table name, and duration in milliseconds") private List<String> args = new ArrayList<>(); @Option(name = "-s", description = "Capacity of stream summary, closer to the actual cardinality of partitions will yield more accurate results (Default: 256)") private int size = 256; @@ -55,63 +58,86 @@ public class TopPartitions extends NodeToolCmd @Override public void execute(NodeProbe probe) { - checkArgument(args.size() == 3, "toppartitions requires keyspace, column family name, and duration"); + checkArgument(args.size() == 3 || args.size() == 1 || args.size() == 0, "Invalid arguments, either [keyspace table duration] or [duration] or no args"); checkArgument(topCount < size, "TopK count (-k) option must be smaller then the summary capacity (-s)"); - String keyspace = args.get(0); - String cfname = args.get(1); - Integer duration = Integer.valueOf(args.get(2)); + String keyspace = null; + String table = null; + Integer duration = 10000; + if(args.size() == 3) + { + keyspace = args.get(0); + table = args.get(1); + duration = Integer.valueOf(args.get(2)); + } + else if (args.size() == 1) + { + duration = Integer.valueOf(args.get(0)); + } // generate the list of samplers - List<Sampler> targets = Lists.newArrayList(); + List<String> targets = Lists.newArrayList(); for (String s : samplers.split(",")) { try { - targets.add(Sampler.valueOf(s.toUpperCase())); + targets.add(Sampler.valueOf(s.toUpperCase()).toString()); } catch (Exception e) { throw new IllegalArgumentException(s + " is not a valid sampler, choose one of: " + join(Sampler.values(), ", ")); } } - Map<Sampler, CompositeData> results; + Map<String, Map<String, CompositeData>> results; try { - results = probe.getPartitionSample(keyspace, cfname, size, duration, topCount, targets); + if (keyspace == null) + { + results = probe.getPartitionSample(size, duration, topCount, targets); + } + else + { + results = probe.getPartitionSample(keyspace, table, size, duration, topCount, targets); + } } catch (OpenDataException e) { throw new RuntimeException(e); } boolean first = true; - for(Entry<Sampler, CompositeData> result : results.entrySet()) + for(String sampler : targets) { - CompositeData sampling = result.getValue(); - // weird casting for http://bugs.sun.com/view_bug.do?bug_id=6548436 - List<CompositeData> topk = (List<CompositeData>) (Object) Lists.newArrayList(((TabularDataSupport) sampling.get("partitions")).values()); - Collections.sort(topk, new Ordering<CompositeData>() + if(!first) + System.out.println(); + first = false; + System.out.printf(sampler + " Sampler Top %d partitions:%n", topCount); + TableBuilder out = new TableBuilder(); + out.add("\t", "Table", "Partition", "Count", "+/-"); + List<Pair<String, CompositeData>> topk = new ArrayList<>(topCount); + for (Entry<String, Map<String, CompositeData>> tableResult : results.entrySet()) + { + String tableName = tableResult.getKey(); + CompositeData sampling = tableResult.getValue().get(sampler); + // weird casting for http://bugs.sun.com/view_bug.do?bug_id=6548436 + for(CompositeData cd : (List<CompositeData>) (Object) Lists.newArrayList(((TabularDataSupport) sampling.get("partitions")).values())) + { + topk.add(Pair.create(tableName, cd)); + } + } + Collections.sort(topk, new Ordering<Pair<String, CompositeData>>() { - public int compare(CompositeData left, CompositeData right) + public int compare(Pair<String, CompositeData> left, Pair<String, CompositeData> right) { - return Long.compare((long) right.get("count"), (long) left.get("count")); + return Long.compare((long) right.right.get("count"), (long) left.right.get("count")); } }); - if(!first) - System.out.println(); - System.out.println(result.getKey().toString()+ " Sampler:"); - System.out.printf(" Cardinality: ~%d (%d capacity)%n", sampling.get("cardinality"), size); - System.out.printf(" Top %d partitions:%n", topCount); - if (topk.size() == 0) + for (Pair<String, CompositeData> entry : topk.subList(0, Math.min(topk.size(), 10))) { - System.out.println("\tNothing recorded during sampling period..."); - } else + CompositeData cd = entry.right; + out.add("\t", entry.left, cd.get("string").toString(), cd.get("count").toString(), cd.get("error").toString()); + } + out.printTo(System.out); + if (topk.size() == 0) { - int offset = 0; - for (CompositeData entry : topk) - offset = Math.max(offset, entry.get("string").toString().length()); - System.out.printf("\t%-" + offset + "s%10s%10s%n", "Partition", "Count", "+/-"); - for (CompositeData entry : topk) - System.out.printf("\t%-" + offset + "s%10d%10d%n", entry.get("string").toString(), entry.get("count"), entry.get("error")); + System.out.println("\t Nothing recorded during sampling period..."); } - first = false; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/45f25053/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java new file mode 100644 index 0000000..64cea3a --- /dev/null +++ b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java @@ -0,0 +1,67 @@ +package org.apache.cassandra.tools; + +import static java.lang.String.format; +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularDataSupport; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.service.StorageService; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TopPartitionsTest +{ + @BeforeClass + public static void loadSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + } + + @Test + public void testServiceTopPartitionsNoArg() throws Exception + { + BlockingQueue<Map<String, Map<String, CompositeData>>> q = new ArrayBlockingQueue<>(1); + ColumnFamilyStore.all(); + Executors.newCachedThreadPool().execute(() -> + { + try + { + q.put(StorageService.instance.samplePartitions(1000, 100, 10, Lists.newArrayList("READS", "WRITES"))); + } + catch (Exception e) + { + e.printStackTrace(); + } + }); + SystemKeyspace.persistLocalMetadata(); + Map<String, Map<String, CompositeData>> result = q.poll(11, TimeUnit.SECONDS); + List<CompositeData> cd = (List<CompositeData>) (Object) Lists.newArrayList(((TabularDataSupport) result.get("system.local").get("WRITES").get("partitions")).values()); + assertEquals(1, cd.size()); + } + + @Test + public void testServiceTopPartitionsSingleTable() throws Exception + { + ColumnFamilyStore.getIfExists("system", "local").beginLocalSampling("READS", 5); + String req = "SELECT * FROM system.%s WHERE key='%s'"; + executeInternal(format(req, SystemKeyspace.LOCAL, SystemKeyspace.LOCAL)); + CompositeData result = ColumnFamilyStore.getIfExists("system", "local").finishLocalSampling("READS", 5); + List<CompositeData> cd = (List<CompositeData>) (Object) Lists.newArrayList(((TabularDataSupport) result.get("partitions")).values()); + assertEquals(1, cd.size()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org