Repository: cassandra
Updated Branches:
  refs/heads/trunk dd650c8e8 -> 45f250535


Allow noarg toppartitions

Closes #214

Patch by Chris Lohfink; Reviewed by Dinesh Joshi for CASSANDRA-14360


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45f25053
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45f25053
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45f25053

Branch: refs/heads/trunk
Commit: 45f250535a5d26011ce7d71c9c09b31d758bfd7b
Parents: dd650c8
Author: Chris Lohfink <clohf...@apple.com>
Authored: Wed May 2 00:52:16 2018 -0700
Committer: Jeff Jirsa <jji...@apple.com>
Committed: Sat May 12 11:19:22 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/service/StorageService.java       | 38 ++++++++
 .../cassandra/service/StorageServiceMBean.java  |  5 +
 .../org/apache/cassandra/tools/NodeProbe.java   | 21 +++--
 .../cassandra/tools/nodetool/TopPartitions.java | 98 +++++++++++++-------
 .../cassandra/tools/TopPartitionsTest.java      | 67 +++++++++++++
 6 files changed, 187 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/45f25053/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index aa4ef39..01c67f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Allow nodetool toppartitions without specifying table (CASSANDRA-14360)
  * Audit logging for database activity (CASSANDRA-12151)
  * Clean up build artifacts in docs container (CASSANDRA-14432)
  * Minor network authz improvements (Cassandra-14413)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45f25053/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index dd2f178..a62af6f 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -33,9 +33,13 @@ import java.util.stream.StreamSupport;
 
 import javax.annotation.Nullable;
 import javax.management.*;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 
+import com.clearspring.analytics.stream.Counter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -76,6 +80,7 @@ import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.metrics.TableMetrics.Sampler;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.repair.*;
 import org.apache.cassandra.repair.messages.RepairOption;
@@ -99,6 +104,7 @@ import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.tracing.TraceKeyspace;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.TopKSampler.SamplerResult;
 import org.apache.cassandra.utils.logging.LoggingSupportFactory;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.cassandra.utils.progress.ProgressEventType;
@@ -5269,6 +5275,38 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return sampledKeys;
     }
 
+    /*
+     * little hard to parse for JMX MBean requirements, but the output looks 
something like:
+     *
+     *  {"keyspace.table":
+     *    {"SAMPLER": [{cardinality:i partitions: [{raw:"", string:"", 
count:i, error:i}, ...]}, ...]}
+     *  }
+     */
+    @Override
+    public Map<String, Map<String, CompositeData>> samplePartitions(long 
duration, int capacity, int count, List<String> samplers) throws 
OpenDataException
+    {
+        for (String sampler : samplers)
+        {
+            for (ColumnFamilyStore table : ColumnFamilyStore.all())
+            {
+                table.beginLocalSampling(sampler, capacity);
+            }
+        }
+
+        Uninterruptibles.sleepUninterruptibly(duration, TimeUnit.MILLISECONDS);
+        ConcurrentHashMap<String, Map<String, CompositeData>> result = new 
ConcurrentHashMap<>();
+        for (String sampler : samplers)
+        {
+            for (ColumnFamilyStore table : ColumnFamilyStore.all())
+            {
+                String name = table.keyspace.getName() + "." + table.name;
+                Map<String, CompositeData> topk = result.computeIfAbsent(name, 
x -> new HashMap<>());
+                topk.put(sampler, table.finishLocalSampling(sampler, count));
+            }
+        }
+        return result;
+    }
+
     public void rebuildSecondaryIndex(String ksName, String cfName, String... 
idxNames)
     {
         String[] indices = asList(idxNames).stream()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45f25053/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 8c4b618..1282105 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -27,9 +27,12 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import javax.management.NotificationEmitter;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.metrics.TableMetrics.Sampler;
 
 public interface StorageServiceMBean extends NotificationEmitter
 {
@@ -628,6 +631,8 @@ public interface StorageServiceMBean extends 
NotificationEmitter
      */
     public void setTraceProbability(double probability);
 
+    public Map<String, Map<String, CompositeData>> samplePartitions(long 
duration, int capacity, int count, List<String> samplers) throws 
OpenDataException;
+
     /**
      * Returns the configured tracing probability.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45f25053/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index f556ffc..7cec99d 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -89,6 +89,7 @@ import 
org.apache.cassandra.streaming.management.StreamStateCompositeData;
 
 import com.google.common.base.Function;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
@@ -418,21 +419,27 @@ public class NodeProbe implements AutoCloseable
             }
         }
     }
+    public Map<String, Map<String, CompositeData>> getPartitionSample(int 
capacity, int duration, int count, List<String> samplers) throws 
OpenDataException
+    {
+        return ssProxy.samplePartitions(duration, capacity, count, samplers);
+    }
 
-    public Map<Sampler, CompositeData> getPartitionSample(String ks, String 
cf, int capacity, int duration, int count, List<Sampler> samplers) throws 
OpenDataException
+    public Map<String, Map<String, CompositeData>> getPartitionSample(String 
ks, String cf, int capacity, int duration, int count, List<String> samplers) 
throws OpenDataException
     {
         ColumnFamilyStoreMBean cfsProxy = getCfsProxy(ks, cf);
-        for(Sampler sampler : samplers)
+        for(String sampler : samplers)
         {
-            cfsProxy.beginLocalSampling(sampler.name(), capacity);
+            cfsProxy.beginLocalSampling(sampler, capacity);
         }
         Uninterruptibles.sleepUninterruptibly(duration, TimeUnit.MILLISECONDS);
-        Map<Sampler, CompositeData> result = Maps.newHashMap();
-        for(Sampler sampler : samplers)
+        Map<String, CompositeData> result = Maps.newHashMap();
+        for(String sampler : samplers)
         {
-            result.put(sampler, cfsProxy.finishLocalSampling(sampler.name(), 
count));
+            result.put(sampler, cfsProxy.finishLocalSampling(sampler, count));
         }
-        return result;
+        return new ImmutableMap.Builder<String, Map<String, CompositeData>>()
+                .put(ks + "." + cf, result)
+                .build();
     }
 
     public void invalidateCounterCache()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45f25053/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java 
b/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java
index 73bf2fb..ee03cd2 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java
@@ -19,9 +19,6 @@ package org.apache.cassandra.tools.nodetool;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.join;
-import io.airlift.airline.Arguments;
-import io.airlift.airline.Command;
-import io.airlift.airline.Option;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -37,14 +34,20 @@ import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.metrics.TableMetrics.Sampler;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
+import org.apache.cassandra.utils.Pair;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 
-@Command(name = "toppartitions", description = "Sample and print the most 
active partitions for a given column family")
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+
+@Command(name = "toppartitions", description = "Sample and print the most 
active partitions")
 public class TopPartitions extends NodeToolCmd
 {
-    @Arguments(usage = "<keyspace> <cfname> <duration>", description = "The 
keyspace, column family name, and duration in milliseconds")
+    @Arguments(usage = "[keyspace table] [duration]", description = "The 
keyspace, table name, and duration in milliseconds")
     private List<String> args = new ArrayList<>();
     @Option(name = "-s", description = "Capacity of stream summary, closer to 
the actual cardinality of partitions will yield more accurate results (Default: 
256)")
     private int size = 256;
@@ -55,63 +58,86 @@ public class TopPartitions extends NodeToolCmd
     @Override
     public void execute(NodeProbe probe)
     {
-        checkArgument(args.size() == 3, "toppartitions requires keyspace, 
column family name, and duration");
+        checkArgument(args.size() == 3 || args.size() == 1 || args.size() == 
0, "Invalid arguments, either [keyspace table duration] or [duration] or no 
args");
         checkArgument(topCount < size, "TopK count (-k) option must be smaller 
then the summary capacity (-s)");
-        String keyspace = args.get(0);
-        String cfname = args.get(1);
-        Integer duration = Integer.valueOf(args.get(2));
+        String keyspace = null;
+        String table = null;
+        Integer duration = 10000;
+        if(args.size() == 3)
+        {
+            keyspace = args.get(0);
+            table = args.get(1);
+            duration = Integer.valueOf(args.get(2));
+        }
+        else if (args.size() == 1)
+        {
+            duration = Integer.valueOf(args.get(0));
+        }
         // generate the list of samplers
-        List<Sampler> targets = Lists.newArrayList();
+        List<String> targets = Lists.newArrayList();
         for (String s : samplers.split(","))
         {
             try
             {
-                targets.add(Sampler.valueOf(s.toUpperCase()));
+                targets.add(Sampler.valueOf(s.toUpperCase()).toString());
             } catch (Exception e)
             {
                 throw new IllegalArgumentException(s + " is not a valid 
sampler, choose one of: " + join(Sampler.values(), ", "));
             }
         }
 
-        Map<Sampler, CompositeData> results;
+        Map<String, Map<String, CompositeData>> results;
         try
         {
-            results = probe.getPartitionSample(keyspace, cfname, size, 
duration, topCount, targets);
+            if (keyspace == null)
+            {
+                results = probe.getPartitionSample(size, duration, topCount, 
targets);
+            }
+            else
+            {
+                results = probe.getPartitionSample(keyspace, table, size, 
duration, topCount, targets);
+            }
         } catch (OpenDataException e)
         {
             throw new RuntimeException(e);
         }
         boolean first = true;
-        for(Entry<Sampler, CompositeData> result : results.entrySet())
+        for(String sampler : targets)
         {
-            CompositeData sampling = result.getValue();
-            // weird casting for http://bugs.sun.com/view_bug.do?bug_id=6548436
-            List<CompositeData> topk = (List<CompositeData>) (Object) 
Lists.newArrayList(((TabularDataSupport) sampling.get("partitions")).values());
-            Collections.sort(topk, new Ordering<CompositeData>()
+            if(!first)
+                System.out.println();
+            first = false;
+            System.out.printf(sampler + " Sampler Top %d partitions:%n", 
topCount);
+            TableBuilder out = new TableBuilder();
+            out.add("\t", "Table", "Partition", "Count", "+/-");
+            List<Pair<String, CompositeData>> topk = new ArrayList<>(topCount);
+            for (Entry<String, Map<String, CompositeData>> tableResult : 
results.entrySet())
+            {
+                String tableName = tableResult.getKey();
+                CompositeData sampling = tableResult.getValue().get(sampler);
+                // weird casting for 
http://bugs.sun.com/view_bug.do?bug_id=6548436
+                for(CompositeData cd : (List<CompositeData>) (Object) 
Lists.newArrayList(((TabularDataSupport) sampling.get("partitions")).values()))
+                {
+                    topk.add(Pair.create(tableName, cd));
+                }
+            }
+            Collections.sort(topk, new Ordering<Pair<String, CompositeData>>()
             {
-                public int compare(CompositeData left, CompositeData right)
+                public int compare(Pair<String, CompositeData> left, 
Pair<String, CompositeData> right)
                 {
-                    return Long.compare((long) right.get("count"), (long) 
left.get("count"));
+                    return Long.compare((long) right.right.get("count"), 
(long) left.right.get("count"));
                 }
             });
-            if(!first)
-                System.out.println();
-            System.out.println(result.getKey().toString()+ " Sampler:");
-            System.out.printf("  Cardinality: ~%d (%d capacity)%n", 
sampling.get("cardinality"), size);
-            System.out.printf("  Top %d partitions:%n", topCount);
-            if (topk.size() == 0)
+            for (Pair<String, CompositeData> entry : topk.subList(0, 
Math.min(topk.size(), 10)))
             {
-                System.out.println("\tNothing recorded during sampling 
period...");
-            } else
+                CompositeData cd = entry.right;
+                out.add("\t", entry.left, cd.get("string").toString(), 
cd.get("count").toString(), cd.get("error").toString());
+            }
+            out.printTo(System.out);
+            if (topk.size() == 0)
             {
-                int offset = 0;
-                for (CompositeData entry : topk)
-                    offset = Math.max(offset, 
entry.get("string").toString().length());
-                System.out.printf("\t%-" + offset + "s%10s%10s%n", 
"Partition", "Count", "+/-");
-                for (CompositeData entry : topk)
-                    System.out.printf("\t%-" + offset + "s%10d%10d%n", 
entry.get("string").toString(), entry.get("count"), entry.get("error"));
+                System.out.println("\t Nothing recorded during sampling 
period...");
             }
-            first = false;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45f25053/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java 
b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java
new file mode 100644
index 0000000..64cea3a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java
@@ -0,0 +1,67 @@
+package org.apache.cassandra.tools;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularDataSupport;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.StorageService;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TopPartitionsTest
+{
+    @BeforeClass
+    public static void loadSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+    }
+
+    @Test
+    public void testServiceTopPartitionsNoArg() throws Exception
+    {
+        BlockingQueue<Map<String, Map<String, CompositeData>>> q = new 
ArrayBlockingQueue<>(1);
+        ColumnFamilyStore.all();
+        Executors.newCachedThreadPool().execute(() ->
+        {
+            try
+            {
+                q.put(StorageService.instance.samplePartitions(1000, 100, 10, 
Lists.newArrayList("READS", "WRITES")));
+            }
+            catch (Exception e)
+            {
+                e.printStackTrace();
+            }
+        });
+        SystemKeyspace.persistLocalMetadata();
+        Map<String, Map<String, CompositeData>> result = q.poll(11, 
TimeUnit.SECONDS);
+        List<CompositeData> cd = (List<CompositeData>) (Object) 
Lists.newArrayList(((TabularDataSupport) 
result.get("system.local").get("WRITES").get("partitions")).values());
+        assertEquals(1, cd.size());
+    }
+
+    @Test
+    public void testServiceTopPartitionsSingleTable() throws Exception
+    {
+        ColumnFamilyStore.getIfExists("system", 
"local").beginLocalSampling("READS", 5);
+        String req = "SELECT * FROM system.%s WHERE key='%s'";
+        executeInternal(format(req, SystemKeyspace.LOCAL, 
SystemKeyspace.LOCAL));
+        CompositeData result = ColumnFamilyStore.getIfExists("system", 
"local").finishLocalSampling("READS", 5);
+        List<CompositeData> cd = (List<CompositeData>) (Object) 
Lists.newArrayList(((TabularDataSupport) result.get("partitions")).values());
+        assertEquals(1, cd.size());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to