This is an automated email from the ASF dual-hosted git repository. jmckenzie pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 0daf21244f Add JMX call to getSSTableCountPerTWCSBucket for TWCS 0daf21244f is described below commit 0daf21244fc0187d092616834d38df1a77dcabf0 Author: Josh McKenzie <jmcken...@apache.org> AuthorDate: Mon Jul 25 11:58:42 2022 -0400 Add JMX call to getSSTableCountPerTWCSBucket for TWCS Patch by Stefan Podkowinski; reviewed by Caleb Rackliffe and Marcus Eriksson for CASSANDRA-17774 Co-authored-by: Stefan Podkowinski <s.podkowin...@gmail.com> Co-authored-by: Josh McKenzie <jmcken...@apache.org> --- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 18 +++++++++- .../cassandra/db/ColumnFamilyStoreMBean.java | 6 ++++ .../db/compaction/CompactionStrategyManager.java | 33 ++++++++++++++++++ .../compaction/TimeWindowCompactionStrategy.java | 10 ++++++ .../cassandra/tools/nodetool/stats/StatsTable.java | 1 + .../tools/nodetool/stats/TableStatsHolder.java | 1 + .../compaction/CompactionStrategyManagerTest.java | 40 ++++++++++++++++++++-- 8 files changed, 107 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 7c9137f824..bdeef8172d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Added JMX call to getSSTableCountPerTWCSBucket for TWCS (CASSANDRA-17774) * When doing a host replacement, -Dcassandra.broadcast_interval_ms is used to know when to check the ring but checks that the ring wasn't changed in -Dcassandra.ring_delay_ms, changes to ring delay should not depend on when we publish load stats (CASSANDRA-17776) * When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters (CASSANDRA-17754) * Add guardrail to disallow DROP KEYSPACE commands (CASSANDRA-17767) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 94ca18084d..a1a5ce4df0 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -22,7 +22,6 @@ import java.io.PrintStream; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; -import java.nio.file.Path; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -609,6 +608,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner return directories; } + @Override public List<String> getDataPaths() throws IOException { List<String> dataPaths = new ArrayList<>(); @@ -1925,12 +1925,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner } } + @Override public void beginLocalSampling(String sampler, int capacity, int durationMillis) { metric.samplers.get(SamplerType.valueOf(sampler)).beginSampling(capacity, durationMillis); } @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override public List<CompositeData> finishLocalSampling(String sampler, int count) throws OpenDataException { Sampler samplerImpl = metric.samplers.get(SamplerType.valueOf(sampler)); @@ -1949,11 +1951,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner return result; } + @Override public boolean isCompactionDiskSpaceCheckEnabled() { return compactionSpaceCheck; } + @Override public void compactionDiskSpaceCheck(boolean enable) { compactionSpaceCheck = enable; @@ -2964,21 +2968,31 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner return indexManager.getBuiltIndexNames(); } + @Override public int getUnleveledSSTables() { return compactionStrategyManager.getUnleveledSSTables(); } + @Override public int[] getSSTableCountPerLevel() { return compactionStrategyManager.getSSTableCountPerLevel(); } + @Override public long[] getPerLevelSizeBytes() { return compactionStrategyManager.getPerLevelSizeBytes(); } + @Override + public int[] getSSTableCountPerTWCSBucket() + { + return compactionStrategyManager.getSSTableCountPerTWCSBucket(); + } + + @Override public int getLevelFanoutSize() { return compactionStrategyManager.getLevelFanoutSize(); @@ -3074,6 +3088,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner } } + @Override public double getDroppableTombstoneRatio() { double allDroppable = 0; @@ -3088,6 +3103,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner return allColumns > 0 ? allDroppable / allColumns : 0; } + @Override public long trueSnapshotsSize() { return getDirectories().trueSnapshotsSize(); diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java index 5b6fd16fe1..d674011231 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java @@ -217,6 +217,12 @@ public interface ColumnFamilyStoreMBean */ public long[] getPerLevelSizeBytes(); + /** + * @return sstable count for each bucket in TWCS. null unless time window compaction is used. + * array index corresponds to bucket(int[0] is for most recent, ...). + */ + public int[] getSSTableCountPerTWCSBucket(); + /** * @return sstable fanout size for level compaction strategy. */ diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 65359b1d88..ca67ddb0ea 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -23,14 +23,19 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.ConcurrentModificationException; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -141,6 +146,8 @@ public class CompactionStrategyManager implements INotificationConsumer private volatile long maxSSTableSizeBytes; private volatile String name; + public static int TWCS_BUCKET_COUNT_MAX = 128; + public CompactionStrategyManager(ColumnFamilyStore cfs) { this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent()); @@ -610,6 +617,32 @@ public class CompactionStrategyManager implements INotificationConsumer } } + public int[] getSSTableCountPerTWCSBucket() + { + readLock.lock(); + try + { + List<Map<Long, Integer>> countsByBucket = Stream.concat( + StreamSupport.stream(repaired.allStrategies().spliterator(), false), + StreamSupport.stream(unrepaired.allStrategies().spliterator(), false)) + .filter((TimeWindowCompactionStrategy.class)::isInstance) + .map(s -> ((TimeWindowCompactionStrategy)s).getSSTableCountByBuckets()) + .collect(Collectors.toList()); + return countsByBucket.isEmpty() ? null : sumCountsByBucket(countsByBucket, TWCS_BUCKET_COUNT_MAX); + } + finally + { + readLock.unlock(); + } + } + + static int[] sumCountsByBucket(List<Map<Long, Integer>> countsByBucket, int max) + { + TreeMap<Long, Integer> merged = new TreeMap<>(Comparator.reverseOrder()); + countsByBucket.stream().flatMap(e -> e.entrySet().stream()).forEach(e -> merged.merge(e.getKey(), e.getValue(), Integer::sum)); + return merged.values().stream().limit(max).mapToInt(i -> i).toArray(); + } + static int[] sumArrays(int[] a, int[] b) { int[] res = new int[Math.max(a.length, b.length)]; diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java index d3b30210d4..9b9b82c82f 100644 --- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java @@ -30,6 +30,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; @@ -57,6 +59,9 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy private long lastExpiredCheck; private long highestWindowSeen; + // This is accessed in both the threading context of compaction / repair and also JMX + private volatile Map<Long, Integer> sstableCountByBuckets = Collections.emptyMap(); + public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) { super(cfs, options); @@ -179,6 +184,7 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy this.highestWindowSeen); this.estimatedRemainingTasks = mostInteresting.estimatedRemainingTasks; + this.sstableCountByBuckets = buckets.left.keySet().stream().collect(Collectors.toMap(Function.identity(), k -> buckets.left.get(k).size())); if (!mostInteresting.sstables.isEmpty()) return mostInteresting.sstables; return null; @@ -412,6 +418,10 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy return Long.MAX_VALUE; } + public Map<Long, Integer> getSSTableCountByBuckets() + { + return sstableCountByBuckets; + } public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException { diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java index 8b5090bd86..48b795b422 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java @@ -71,6 +71,7 @@ public class StatsTable public String droppedMutations; public List<String> sstablesInEachLevel = new ArrayList<>(); public List<String> sstableBytesInEachLevel = new ArrayList<>(); + public int[] sstableCountPerTWCSBucket = null; public Boolean isInCorrectLocation = null; // null: option not active public double droppableTombstoneRatio; public Map<String, String> topSizePartitions; diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java index c6b2301c2c..fc6ef145d6 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java @@ -241,6 +241,7 @@ public class TableStatsHolder implements StatsHolder statsTable.sstablesInEachLevel.add(count + ((count > maxCount) ? "/" + maxCount : "")); } } + statsTable.sstableCountPerTWCSBucket = table.getSSTableCountPerTWCSBucket(); long[] leveledSSTablesBytes = table.getPerLevelSizeBytes(); if (leveledSSTablesBytes != null) diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java index 9960e8e0fe..8a9f2fb225 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java @@ -28,11 +28,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.io.Files; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -395,6 +398,41 @@ public class CompactionStrategyManagerTest } } + @Test + public void testCountsByBuckets() + { + Assert.assertArrayEquals( + new int[] {2, 2, 4}, + CompactionStrategyManager.sumCountsByBucket(ImmutableList.of( + ImmutableMap.of(60000L, 1, 0L, 2, 180000L, 1), + ImmutableMap.of(60000L, 1, 0L, 2, 180000L, 1)), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX)); + Assert.assertArrayEquals( + new int[] {1, 1, 3}, + CompactionStrategyManager.sumCountsByBucket(ImmutableList.of( + ImmutableMap.of(60000L, 1, 0L, 1), + ImmutableMap.of(0L, 2, 180000L, 1)), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX)); + Assert.assertArrayEquals( + new int[] {1, 1}, + CompactionStrategyManager.sumCountsByBucket(ImmutableList.of( + ImmutableMap.of(60000L, 1, 0L, 1), + ImmutableMap.of()), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX)); + Assert.assertArrayEquals( + new int[] {8, 4}, + CompactionStrategyManager.sumCountsByBucket(ImmutableList.of( + ImmutableMap.of(60000L, 2, 0L, 1, 180000L, 4), + ImmutableMap.of(60000L, 2, 0L, 1, 180000L, 4)), 2)); + Assert.assertArrayEquals( + new int[] {1, 1, 2}, + CompactionStrategyManager.sumCountsByBucket(ImmutableList.of( + Collections.emptyMap(), + ImmutableMap.of(60000L, 1, 0L, 2, 180000L, 1)), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX)); + Assert.assertArrayEquals( + new int[] {}, + CompactionStrategyManager.sumCountsByBucket(ImmutableList.of( + Collections.emptyMap(), + Collections.emptyMap()), CompactionStrategyManager.TWCS_BUCKET_COUNT_MAX)); + } + private MockCFS createJBODMockCFS(int disks) { // Create #disks data directories to simulate JBOD @@ -464,8 +502,6 @@ public class CompactionStrategyManagerTest return index; } - - class MockBoundaryManager { private final ColumnFamilyStore cfs; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org