Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 1435b9a87 -> faf91818b refs/heads/trunk 184bb65fc -> 0a09b87dc
Add tooling to detect hot partitions Patch by Chris Lohfink, reviewed by brandonwilliams for CASSANDRA-7974 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/faf91818 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/faf91818 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/faf91818 Branch: refs/heads/cassandra-2.1 Commit: faf91818b46fb51ed576664a1119315e7b7c3383 Parents: 1435b9a Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed Jan 21 11:45:45 2015 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed Jan 21 11:45:45 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 69 ++++++++- .../cassandra/db/ColumnFamilyStoreMBean.java | 14 ++ .../cassandra/metrics/ColumnFamilyMetrics.java | 19 ++- .../org/apache/cassandra/tools/NodeProbe.java | 26 +++- .../org/apache/cassandra/tools/NodeTool.java | 83 ++++++++++- .../org/apache/cassandra/utils/TopKSampler.java | 139 ++++++++++++++++++ .../apache/cassandra/utils/TopKSamplerTest.java | 147 +++++++++++++++++++ 8 files changed, 482 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0c2bab8..f1eaa77 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.3 + * Add tooling to detect hot partitions (CASSANDRA-7974) * Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608) * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267) * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316) http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index f7a691e..0c95b0e 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -26,14 +26,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import javax.management.*; +import javax.management.openmbean.*; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Predicate; +import com.google.common.base.*; import com.google.common.collect.*; import com.google.common.util.concurrent.*; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.Uninterruptibles; import org.json.simple.*; import org.slf4j.Logger; @@ -68,14 +66,18 @@ import org.apache.cassandra.io.sstable.metadata.CompactionMetadata; import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.ColumnFamilyMetrics; +import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.StreamLockfile; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.TopKSampler.SamplerResult; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.memory.MemtableAllocator; +import com.clearspring.analytics.stream.Counter; + public class ColumnFamilyStore implements ColumnFamilyStoreMBean { private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); @@ -102,6 +104,39 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean new NamedThreadFactory("MemtableReclaimMemory"), "internal"); + private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"}; + private static final String[] COUNTER_DESCS = new String[] + { "partition key in raw hex bytes", + "value of this partition for given sampler", + "value is within the error bounds plus or minus of this", + "the partition key turned into a human readable format" }; + private static final CompositeType COUNTER_COMPOSITE_TYPE; + private static final TabularType COUNTER_TYPE; + + private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"}; + private static final String[] SAMPLER_DESCS = new String[] + { "cardinality of partitions", + "list of counter results" }; + + private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS"; + private static final CompositeType SAMPLING_RESULT; + + static + { + try + { + OpenType<?>[] counterTypes = new OpenType[] { SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING }; + COUNTER_COMPOSITE_TYPE = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_NAMES, COUNTER_DESCS, counterTypes); + COUNTER_TYPE = new TabularType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_COMPOSITE_TYPE, COUNTER_NAMES); + + OpenType<?>[] samplerTypes = new OpenType[] { SimpleType.LONG, COUNTER_TYPE }; + SAMPLING_RESULT = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, SAMPLER_NAMES, SAMPLER_DESCS, samplerTypes); + } catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + public final Keyspace keyspace; public final String name; public final CFMetaData metadata; @@ -1152,6 +1187,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Memtable mt = data.getMemtableFor(opGroup, replayPosition); final long timeDelta = mt.put(key, columnFamily, indexer, opGroup); maybeUpdateRowCache(key); + metric.samplers.get(Sampler.WRITES).addSample(key.getKey()); metric.writeLatency.addNano(System.nanoTime() - start); if(timeDelta < Long.MAX_VALUE) metric.colUpdateTimeDeltaHistogram.update(timeDelta); @@ -1915,10 +1951,35 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { columns = controller.getTopLevelColumns(Memtable.MEMORY_POOL.needToCopyOnHeap()); } + if (columns != null) + metric.samplers.get(Sampler.READS).addSample(filter.key.getKey()); metric.updateSSTableIterated(controller.getSstablesIterated()); return columns; } + public void beginLocalSampling(String sampler, int capacity) + { + metric.samplers.get(Sampler.valueOf(sampler)).beginSampling(capacity); + } + + public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException + { + SamplerResult<ByteBuffer> samplerResults = metric.samplers.get(Sampler.valueOf(sampler)) + .finishSampling(count); + TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE); + for (Counter<ByteBuffer> counter : samplerResults.topK) + { + byte[] key = counter.getItem().array(); + result.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, COUNTER_NAMES, new Object[] { + Hex.bytesToHex(key), // raw + counter.getCount(), // count + counter.getError(), // error + metadata.getKeyValidator().getString(ByteBuffer.wrap(key)) })); // string + } + return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[]{ + samplerResults.cardinality, result}); + } + public void cleanupCache() { Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java index 3418b26..4df593b 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java @@ -21,6 +21,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; + /** * The MBean interface for ColumnFamilyStore */ @@ -402,4 +405,15 @@ public interface ColumnFamilyStoreMBean * @return the size of SSTables in "snapshots" subdirectory which aren't live anymore */ public long trueSnapshotsSize(); + + /** + * begin sampling for a specific sampler with a given capacity. The cardinality may + * be larger than the capacity, but depending on the use case it may affect its accuracy + */ + public void beginLocalSampling(String sampler, int capacity); + + /** + * @return top <i>count</i> items for the sampler since beginLocalSampling was called + */ + public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java index b906750..c82569d 100644 --- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java @@ -17,9 +17,8 @@ */ package org.apache.cassandra.metrics; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; +import java.nio.ByteBuffer; +import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -28,11 +27,13 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.utils.EstimatedHistogram; +import org.apache.cassandra.utils.TopKSampler; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.yammer.metrics.Metrics; import com.yammer.metrics.core.*; +import com.yammer.metrics.core.Timer; import com.yammer.metrics.util.RatioGauge; /** @@ -144,6 +145,7 @@ public class ColumnFamilyMetrics public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalNameFactory, "Write"); public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalNameFactory, "Range"); + public final Map<Sampler, TopKSampler<ByteBuffer>> samplers; /** * stores metrics that will be rolled into a single global metric */ @@ -203,6 +205,12 @@ public class ColumnFamilyMetrics { factory = new ColumnFamilyMetricNameFactory(cfs); + samplers = Maps.newHashMap(); + for (Sampler sampler : Sampler.values()) + { + samplers.put(sampler, new TopKSampler<ByteBuffer>()); + } + memtableColumnsCount = createColumnFamilyGauge("MemtableColumnsCount", new Gauge<Long>() { public Long value() @@ -766,4 +774,9 @@ public class ColumnFamilyMetrics return new MetricName(groupName, "ColumnFamily", metricName, "all", mbeanName.toString()); } } + + public static enum Sampler + { + READS, WRITES + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 00f9686..67cc7f1 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -28,8 +28,7 @@ import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.*; import java.util.Map.Entry; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.locks.Condition; import javax.management.*; import javax.management.openmbean.CompositeData; @@ -37,11 +36,11 @@ import javax.management.remote.JMXConnectionNotification; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; -import javax.management.openmbean.TabularData; +import javax.management.openmbean.*; import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; +import com.google.common.collect.*; +import com.google.common.util.concurrent.Uninterruptibles; import com.yammer.metrics.reporting.JmxReporter; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; @@ -53,6 +52,7 @@ import org.apache.cassandra.db.compaction.CompactionManagerMBean; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.FailureDetectorMBean; import org.apache.cassandra.locator.EndpointSnitchInfoMBean; +import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.MessagingServiceMBean; import org.apache.cassandra.repair.RepairParallelism; @@ -312,6 +312,22 @@ public class NodeProbe implements AutoCloseable } } + public Map<Sampler, CompositeData> getPartitionSample(String ks, String cf, int capacity, int duration, int count, List<Sampler> samplers) throws OpenDataException + { + ColumnFamilyStoreMBean cfsProxy = getCfsProxy(ks, cf); + for(Sampler sampler : samplers) + { + cfsProxy.beginLocalSampling(sampler.name(), capacity); + } + Uninterruptibles.sleepUninterruptibly(duration, TimeUnit.MILLISECONDS); + Map<Sampler, CompositeData> result = Maps.newHashMap(); + for(Sampler sampler : samplers) + { + result.put(sampler, cfsProxy.finishLocalSampling(sampler.name(), count)); + } + return result; + } + public void invalidateCounterCache() { cacheService.invalidateCounterCache(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 2cc0b98..12496fc 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -27,13 +27,11 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ExecutionException; -import javax.management.openmbean.TabularData; +import javax.management.openmbean.*; import com.google.common.base.Joiner; import com.google.common.base.Throwables; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.LinkedHashMultimap; -import com.google.common.collect.Maps; +import com.google.common.collect.*; import com.yammer.metrics.reporting.JmxReporter; import io.airlift.command.*; @@ -47,6 +45,7 @@ import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.EndpointSnitchInfoMBean; import org.apache.cassandra.locator.LocalStrategy; +import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler; import org.apache.cassandra.net.MessagingServiceMBean; import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.service.CacheServiceMBean; @@ -146,6 +145,7 @@ public class NodeTool Drain.class, TruncateHints.class, TpStats.class, + TopPartitions.class, SetLoggingLevel.class, GetLoggingLevels.class ); @@ -925,6 +925,81 @@ public class NodeTool } } + @Command(name = "toppartitions", description = "Sample and print the most active partitions for a given column family") + public static class TopPartitions extends NodeToolCmd + { + @Arguments(usage = "<keyspace> <cfname> <duration>", description = "The keyspace, column family name, and duration in milliseconds") + private List<String> args = new ArrayList<>(); + @Option(name = "-s", description = "Capacity of stream summary, closer to the actual cardinality of partitions will yield more accurate results (Default: 256)") + private int size = 256; + @Option(name = "-k", description = "Number of the top partitions to list (Default: 10)") + private int topCount = 10; + @Option(name = "-a", description = "Comma separated list of samplers to use (Default: all)") + private String samplers = join(Sampler.values(), ','); + @Override + public void execute(NodeProbe probe) + { + checkArgument(args.size() == 3, "toppartitions requires keyspace, column family name, and duration"); + checkArgument(topCount < size, "TopK count (-k) option must be smaller then the summary capacity (-s)"); + String keyspace = args.get(0); + String cfname = args.get(1); + Integer duration = Integer.parseInt(args.get(2)); + // generate the list of samplers + List<Sampler> targets = Lists.newArrayList(); + for (String s : samplers.split(",")) + { + try + { + targets.add(Sampler.valueOf(s.toUpperCase())); + } catch (Exception e) + { + throw new IllegalArgumentException(s + " is not a valid sampler, choose one of: " + join(Sampler.values(), ", ")); + } + } + + Map<Sampler, CompositeData> results; + try + { + results = probe.getPartitionSample(keyspace, cfname, size, duration, topCount, targets); + } catch (OpenDataException e) + { + throw new RuntimeException(e); + } + boolean first = true; + for(Entry<Sampler, CompositeData> result : results.entrySet()) + { + CompositeData sampling = result.getValue(); + // weird casting for http://bugs.sun.com/view_bug.do?bug_id=6548436 + List<CompositeData> topk = (List<CompositeData>) (Object) Lists.newArrayList(((TabularDataSupport) sampling.get("partitions")).values()); + Collections.sort(topk, new Ordering<CompositeData>() + { + public int compare(CompositeData left, CompositeData right) + { + return Long.compare((long) right.get("count"), (long) left.get("count")); + } + }); + if(!first) + System.out.println(); + System.out.println(result.getKey().toString()+ " Sampler:"); + System.out.printf(" Cardinality: ~%d (%d capacity)%n", (long) sampling.get("cardinality"), size); + System.out.printf(" Top %d partitions:%n", topCount); + if (topk.size() == 0) + { + System.out.println("\tNothing recorded during sampling period..."); + } else + { + int offset = 0; + for (CompositeData entry : topk) + offset = Math.max(offset, entry.get("string").toString().length()); + System.out.printf("\t%-" + offset + "s%10s%10s%n", "Partition", "Count", "+/-"); + for (CompositeData entry : topk) + System.out.printf("\t%-" + offset + "s%10d%10d%n", entry.get("string").toString(), entry.get("count"), entry.get("error")); + } + first = false; + } + } + } + @Command(name = "cfhistograms", description = "Print statistic histograms for a given column family") public static class CfHistograms extends NodeToolCmd { http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/utils/TopKSampler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/TopKSampler.java b/src/java/org/apache/cassandra/utils/TopKSampler.java new file mode 100644 index 0000000..29d46286 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/TopKSampler.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.*; + +import org.apache.cassandra.concurrent.*; +import org.slf4j.*; + +import com.clearspring.analytics.stream.*; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; +import com.google.common.annotations.VisibleForTesting; + +public class TopKSampler<T> +{ + private static final Logger logger = LoggerFactory.getLogger(TopKSampler.class); + private volatile boolean enabled = false; + + @VisibleForTesting + static final ThreadPoolExecutor samplerExecutor = new JMXEnabledThreadPoolExecutor(1, 1, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("Sampler"), + "internal"); + + private StreamSummary<T> summary; + @VisibleForTesting + HyperLogLogPlus hll; + + /** + * Start to record samples + * + * @param capacity + * Number of sample items to keep in memory, the lower this is + * the less accurate results are. For best results use value + * close to cardinality, but understand the memory trade offs. + */ + public synchronized void beginSampling(int capacity) + { + if (!enabled) + { + summary = new StreamSummary<T>(capacity); + hll = new HyperLogLogPlus(14); + enabled = true; + } + } + + /** + * Call to stop collecting samples, and gather the results + * @param count Number of most frequent items to return + */ + public synchronized SamplerResult<T> finishSampling(int count) + { + List<Counter<T>> results = Collections.EMPTY_LIST; + long cardinality = 0; + if (enabled) + { + enabled = false; + results = summary.topK(count); + cardinality = hll.cardinality(); + } + return new SamplerResult<T>(results, cardinality); + } + + public void addSample(T item) + { + addSample(item, 1); + } + + /** + * Adds a sample to statistics collection. This method is non-blocking and will + * use the "Sampler" thread pool to record results if the sampler is enabled. If not + * sampling this is a NOOP + */ + public void addSample(final T item, final int value) + { + if (enabled) + { + final Object lock = this; + samplerExecutor.execute(new Runnable() + { + public void run() + { + // samplerExecutor is single threaded but still need + // synchronization against jmx calls to finishSampling + synchronized (lock) + { + if (enabled) + { + try + { + summary.offer(item, value); + hll.offer(item); + } catch (Exception e) + { + logger.debug("Failure to offer sample", e); + } + } + } + } + }); + } + } + + /** + * Represents the cardinality and the topK ranked items collected during a + * sample period + */ + public static class SamplerResult<S> implements Serializable + { + public final List<Counter<S>> topK; + public final long cardinality; + + public SamplerResult(List<Counter<S>> topK, long cardinality) + { + this.topK = topK; + this.cardinality = cardinality; + } + } + +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java new file mode 100644 index 0000000..dc3b91c --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java @@ -0,0 +1,147 @@ +package org.apache.cassandra.utils; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import junit.framework.Assert; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.utils.TopKSampler.SamplerResult; +import org.junit.Test; + +import com.clearspring.analytics.stream.Counter; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Uninterruptibles; + +public class TopKSamplerTest +{ + + @Test + public void testSamplerSingleInsertionsEqualMulti() throws TimeoutException + { + TopKSampler<String> sampler = new TopKSampler<String>(); + sampler.beginSampling(10); + insert(sampler); + waitForEmpty(1000); + SamplerResult single = sampler.finishSampling(10); + + TopKSampler<String> sampler2 = new TopKSampler<String>(); + sampler2.beginSampling(10); + for(int i = 1; i <= 10; i++) + { + sampler2.addSample("item" + i, i); + } + waitForEmpty(1000); + Assert.assertEquals(countMap(single.topK), countMap(sampler2.finishSampling(10).topK)); + Assert.assertEquals(sampler.hll.cardinality(), sampler2.hll.cardinality()); + } + + @Test + public void testSamplerOutOfOrder() throws TimeoutException + { + TopKSampler<String> sampler = new TopKSampler<String>(); + sampler.beginSampling(10); + insert(sampler); + waitForEmpty(1000); + SamplerResult single = sampler.finishSampling(10); + single = sampler.finishSampling(10); + } + + /** + * checking for exceptions from SS/HLL which are not thread safe + */ + @Test + public void testMultithreadedAccess() throws Exception + { + final AtomicBoolean running = new AtomicBoolean(true); + final CountDownLatch latch = new CountDownLatch(1); + final TopKSampler<String> sampler = new TopKSampler<String>(); + + new Thread(new Runnable() + { + public void run() + { + try + { + while (running.get()) + { + insert(sampler); + } + } finally + { + latch.countDown(); + } + } + + } + ,"inserter").start(); + try + { + // start/stop in fast iterations + for(int i = 0; i<100; i++) + { + sampler.beginSampling(i); + sampler.finishSampling(i); + } + // start/stop with pause to let it build up past capacity + for(int i = 0; i<3; i++) + { + sampler.beginSampling(i); + Thread.sleep(250); + sampler.finishSampling(i); + } + + // with empty results + running.set(false); + latch.await(1, TimeUnit.SECONDS); + waitForEmpty(1000); + for(int i = 0; i<10; i++) + { + sampler.beginSampling(i); + Thread.sleep(i); + sampler.finishSampling(i); + } + } finally + { + running.set(false); + } + } + + private void insert(TopKSampler<String> sampler) + { + for(int i = 1; i <= 10; i++) + { + for(int j = 0; j < i; j++) + { + sampler.addSample("item" + i); + } + } + } + + private void waitForEmpty(int timeoutMs) throws TimeoutException + { + int timeout = 0; + while (!TopKSampler.samplerExecutor.getQueue().isEmpty()) + { + timeout++; + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + if (timeout * 100 > timeoutMs) + { + throw new TimeoutException("TRACE executor not cleared within timeout"); + } + } + } + + private <T> Map<T, Long> countMap(List<Counter<T>> target) + { + Map<T, Long> counts = Maps.newHashMap(); + for(Counter<T> counter : target) + { + counts.put(counter.getItem(), counter.getCount()); + } + return counts; + } +}