This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 296f65e8d1c25f31a87481843d715f5b7dad9d7b Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Wed Jun 30 16:15:17 2021 +0200 Log when compacting many tombstones Patch by marcuse; reviewed by Brandon Williams for CASSANDRA-16780 --- CHANGES.txt | 1 + conf/cassandra.yaml | 3 + src/java/org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 10 ++ .../io/sstable/format/big/BigTableWriter.java | 10 ++ .../io/sstable/metadata/MetadataCollector.java | 14 ++ .../apache/cassandra/service/StorageService.java | 13 ++ .../cassandra/service/StorageServiceMBean.java | 3 + .../cassandra/distributed/impl/Instance.java | 6 +- .../distributed/test/TombstoneWarningTest.java | 144 +++++++++++++++++++++ 10 files changed, 204 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index d2a0cff..6efd0c0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1 + * Log when compacting many tombstones (CASSANDRA-16780) * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799) * Add isolated flush timer to CommitLogMetrics and ensure writes correspond to single WaitingOnCommit data points (CASSANDRA-16701) * Add a system property to set hostId if not yet initialized (CASSANDRA-14582) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 46d94d9..852945f 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1288,6 +1288,9 @@ unlogged_batch_across_partitions_warn_threshold: 10 # Log a warning when compacting partitions larger than this value compaction_large_partition_warning_threshold_mb: 100 +# Log a warning when writing more tombstones than this value to a partition +compaction_tombstone_warning_threshold: 100000 + # GC Pauses greater than 200 ms will be logged at INFO level # This threshold can be adjusted to minimize logging if necessary # gc_log_threshold_in_ms: 200 diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index ae3e27e..96f047e 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -219,6 +219,7 @@ public class Config public volatile int compaction_throughput_mb_per_sec = 16; public volatile int compaction_large_partition_warning_threshold_mb = 100; public int min_free_space_per_drive_in_mb = 50; + public volatile Integer compaction_tombstone_warning_threshold = 100000; public volatile int concurrent_materialized_view_builders = 1; public volatile int reject_repair_compaction_threshold = Integer.MAX_VALUE; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 00ef887..86448c7 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1801,6 +1801,16 @@ public class DatabaseDescriptor public static long getCompactionLargePartitionWarningThreshold() { return ByteUnit.MEBI_BYTES.toBytes(conf.compaction_large_partition_warning_threshold_mb); } + public static int getCompactionTombstoneWarningThreshold() + { + return conf.compaction_tombstone_warning_threshold; + } + + public static void setCompactionTombstoneWarningThreshold(int count) + { + conf.compaction_tombstone_warning_threshold = count; + } + public static int getConcurrentValidations() { return conf.concurrent_validations; diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index eeb9153..4607d99 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -231,6 +231,7 @@ public class BigTableWriter extends SSTableWriter long endPosition = dataFile.position(); long rowSize = endPosition - startPosition; maybeLogLargePartitionWarning(key, rowSize); + maybeLogManyTombstonesWarning(key, metadataCollector.totalTombstones); metadataCollector.addPartitionSizeInBytes(rowSize); afterAppend(key, endPosition, entry, columnIndexWriter.buffer()); return entry; @@ -259,6 +260,15 @@ public class BigTableWriter extends SSTableWriter } } + private void maybeLogManyTombstonesWarning(DecoratedKey key, int tombstoneCount) + { + if (tombstoneCount > DatabaseDescriptor.getCompactionTombstoneWarningThreshold()) + { + String keyString = metadata().partitionKeyType.getString(key.getKey()); + logger.warn("Writing {} tombstones to {}/{}:{} in sstable {}", tombstoneCount, metadata.keyspace, metadata.name, keyString, getFilename()); + } + } + private static class StatsCollector extends Transformation { private final MetadataCollector collector; diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index be824ef..1e2d121 100755 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -39,6 +39,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.EstimatedHistogram; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MurmurHash; import org.apache.cassandra.utils.streamhist.TombstoneHistogram; import org.apache.cassandra.utils.streamhist.StreamingTombstoneHistogramBuilder; @@ -105,6 +106,7 @@ public class MetadataCollector implements PartitionStatisticsCollector protected boolean hasLegacyCounterShards = false; protected long totalColumnsSet; protected long totalRows; + public int totalTombstones; /** * Default cardinality estimation method is to use HyperLogLog++. @@ -114,6 +116,7 @@ public class MetadataCollector implements PartitionStatisticsCollector */ protected ICardinality cardinality = new HyperLogLogPlus(13, 25); private final ClusteringComparator comparator; + private final int nowInSec = FBUtilities.nowInSeconds(); private final UUID originatingHostId; @@ -149,6 +152,7 @@ public class MetadataCollector implements PartitionStatisticsCollector { long hashed = MurmurHash.hash2_64(key, key.position(), key.remaining(), 0); cardinality.offerHashed(hashed); + totalTombstones = 0; return this; } @@ -182,6 +186,8 @@ public class MetadataCollector implements PartitionStatisticsCollector updateTimestamp(newInfo.timestamp()); updateTTL(newInfo.ttl()); updateLocalDeletionTime(newInfo.localExpirationTime()); + if (!newInfo.isLive(nowInSec)) + updateTombstoneCount(); } public void update(Cell<?> cell) @@ -189,6 +195,8 @@ public class MetadataCollector implements PartitionStatisticsCollector updateTimestamp(cell.timestamp()); updateTTL(cell.ttl()); updateLocalDeletionTime(cell.localDeletionTime()); + if (!cell.isLive(nowInSec)) + updateTombstoneCount(); } public void update(DeletionTime dt) @@ -197,6 +205,7 @@ public class MetadataCollector implements PartitionStatisticsCollector { updateTimestamp(dt.markedForDeleteAt()); updateLocalDeletionTime(dt.localDeletionTime()); + updateTombstoneCount(); } } @@ -218,6 +227,11 @@ public class MetadataCollector implements PartitionStatisticsCollector estimatedTombstoneDropTime.update(newLocalDeletionTime); } + private void updateTombstoneCount() + { + ++totalTombstones; + } + private void updateTTL(int newTTL) { ttlTracker.update(newTTL); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a4a9f9b..60cb739 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -5977,4 +5977,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.info("Changing keyspace count warn threshold from {} to {}", getKeyspaceCountWarnThreshold(), value); DatabaseDescriptor.setKeyspaceCountWarnThreshold(value); } + + public void setCompactionTombstoneWarningThreshold(int count) + { + if (count < 0) + throw new IllegalStateException("compaction tombstone warning threshold needs to be >= 0, not "+count); + logger.info("Setting compaction_tombstone_warning_threshold to {}", count); + DatabaseDescriptor.setCompactionTombstoneWarningThreshold(count); + } + + public int getCompactionTombstoneWarningThreshold() + { + return DatabaseDescriptor.getCompactionTombstoneWarningThreshold(); + } } diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index cc69fec..a5a6607 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -856,4 +856,7 @@ public interface StorageServiceMBean extends NotificationEmitter void setTableCountWarnThreshold(int value); int getKeyspaceCountWarnThreshold(); void setKeyspaceCountWarnThreshold(int value); + + public void setCompactionTombstoneWarningThreshold(int count); + public int getCompactionTombstoneWarningThreshold(); } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index d772d51..a58f2db 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -185,7 +185,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance String suite = System.getProperty("suitename", "suitename_IS_UNDEFINED"); String clusterId = ClusterIDDefiner.getId(); String instanceId = InstanceIDDefiner.getInstanceId(); - return new FileLogAction(new File(String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite, clusterId, instanceId))); + File f = new File(String.format("build/test/logs/%s/%s/%s/%s/system.log", tag, suite, clusterId, instanceId)); + // when creating a cluster globally in a test class we get the logs without the suite, try finding those logs: + if (!f.exists()) + f = new File(String.format("build/test/logs/%s/%s/%s/system.log", tag, clusterId, instanceId)); + return new FileLogAction(f); } @Override diff --git a/test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java new file mode 100644 index 0000000..9406432 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/TombstoneWarningTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.LogResult; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TombstoneWarningTest extends TestBaseImpl +{ + private static final int COMPACTION_TOMBSTONE_WARN = 75; + private static final ICluster<IInvokableInstance> cluster; + + static + { + try + { + Cluster.Builder builder = Cluster.build(3); + builder.withConfig(c -> c.set("compaction_tombstone_warning_threshold", COMPACTION_TOMBSTONE_WARN)); + cluster = builder.createWithoutStarting(); + } + catch (IOException e) + { + throw new AssertionError(e); + } + } + + @BeforeClass + public static void setupClass() + { + cluster.startup(); + } + + @Before + public void setup() + { + cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE); + init(cluster); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + } + + @Test + public void regularTombstonesLogTest() + { + for (int i = 0; i < 100; i++) + for (int j = 0; j < i; j++) + cluster.coordinator(1).execute(withKeyspace("update %s.tbl set v = null where pk = ? and ck = ?"), ConsistencyLevel.ALL, i, j); + assertTombstoneLogs(99 - COMPACTION_TOMBSTONE_WARN , false); + } + + @Test + public void rowTombstonesLogTest() + { + for (int i = 0; i < 100; i++) + for (int j = 0; j < i; j++) + cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl where pk = ? and ck = ?"), ConsistencyLevel.ALL, i, j); + assertTombstoneLogs(99 - COMPACTION_TOMBSTONE_WARN , false); + } + + @Test + public void rangeTombstonesLogTest() + { + for (int i = 0; i < 100; i++) + for (int j = 0; j < i; j++) + cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl where pk = ? and ck >= ? and ck <= ?"), ConsistencyLevel.ALL, i, j, j); + assertTombstoneLogs(99 - (COMPACTION_TOMBSTONE_WARN / 2), true); + } + + @Test + public void ttlTest() throws InterruptedException + { + for (int i = 0; i < 100; i++) + for (int j = 0; j < i; j++) + cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck, v) values (?, ?, ?) using ttl 1000"), ConsistencyLevel.ALL, i, j, j); + assertTombstoneLogs(0, true); + for (int i = 0; i < 100; i++) + for (int j = 0; j < i; j++) + cluster.coordinator(1).execute(withKeyspace("update %s.tbl using ttl 1 set v = 33 where pk = ? and ck = ?"), ConsistencyLevel.ALL, i, j); + Thread.sleep(1500); + assertTombstoneLogs(99 - COMPACTION_TOMBSTONE_WARN, false); + } + + @Test + public void noTombstonesLogTest() + { + for (int i = 0; i < 100; i++) + for (int j = 0; j < i; j++) + cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck, v) values (?, ?, ?)"), ConsistencyLevel.ALL, i, j, j); + assertTombstoneLogs(0, false); + } + + private void assertTombstoneLogs(long expectedCount, boolean isRangeTombstones) + { + long mark = cluster.get(1).logs().mark(); + cluster.get(1).flush(KEYSPACE); + String pattern = ".*Writing (?<tscount>\\d+) tombstones to distributed_test_keyspace/tbl:(?<key>\\d+).*"; + LogResult<List<String>> res = cluster.get(1).logs().grep(mark, pattern); + assertEquals(expectedCount, res.getResult().size()); + Pattern p = Pattern.compile(pattern); + for (String r : res.getResult()) + { + Matcher m = p.matcher(r); + assertTrue(m.matches()); + long tombstoneCount = Integer.parseInt(m.group("tscount")); + assertTrue(tombstoneCount > COMPACTION_TOMBSTONE_WARN); + assertEquals(r, Integer.parseInt(m.group("key")) * (isRangeTombstones ? 2 : 1), tombstoneCount); + } + + mark = cluster.get(1).logs().mark(); + cluster.get(1).forceCompact(KEYSPACE, "tbl"); + res = cluster.get(1).logs().grep(mark, pattern); + assertEquals(expectedCount, res.getResult().size()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org