Repository: cassandra Updated Branches: refs/heads/cassandra-3.11 14e46e462 -> c253ed4fa refs/heads/trunk 41904684b -> a01019d2c
Prevent compaction strategies from looping indefinitely Patch by Paulo Motta; Reviewed by Marcus Eriksson for CASSANDRA-14079 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c253ed4f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c253ed4f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c253ed4f Branch: refs/heads/cassandra-3.11 Commit: c253ed4fa7b7b5667879bb41be09fe9658224c4e Parents: 14e46e4 Author: Paulo Motta <pauloricard...@gmail.com> Authored: Sat Nov 25 01:55:35 2017 +1100 Committer: Paulo Motta <pa...@apache.org> Committed: Fri Dec 1 05:07:31 2017 +1100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../DateTieredCompactionStrategy.java | 16 ++- .../compaction/LeveledCompactionStrategy.java | 22 ++- .../db/compaction/LeveledManifest.java | 22 ++- .../SizeTieredCompactionStrategy.java | 12 ++ .../TimeWindowCompactionStrategy.java | 12 ++ .../AbstractCompactionStrategyTest.java | 144 +++++++++++++++++++ 7 files changed, 222 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fc18dc3..ce279f2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.11.2 + * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079) * Cache disk boundaries (CASSANDRA-13215) * Add asm jar to build.xml for maven builds (CASSANDRA-11193) * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index 729ddc0..bb9f4b9 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -73,6 +73,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy @SuppressWarnings("resource") public AbstractCompactionTask getNextBackgroundTask(int gcBefore) { + List<SSTableReader> previousCandidate = null; while (true) { List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore); @@ -80,9 +81,20 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy if (latestBucket.isEmpty()) return null; + // Already tried acquiring references without success. It means there is a race with + // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager + if (latestBucket.equals(previousCandidate)) + { + logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," + + "unless it happens frequently, in which case it must be reported. Will retry later.", + latestBucket); + return null; + } + LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION); if (modifier != null) return new CompactionTask(cfs, modifier, gcBefore); + previousCandidate = latestBucket; } } @@ -170,6 +182,8 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy // no need to convert to collection if had an Iterables.max(), but not present in standard toolkit, and not worth adding List<SSTableReader> list = new ArrayList<>(); Iterables.addAll(list, cfs.getSSTables(SSTableSet.LIVE)); + if (list.isEmpty()) + return 0; return Collections.max(list, (o1, o2) -> Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp())) .getMaxTimestamp(); } @@ -462,7 +476,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy return uncheckedOptions; } - public CompactionLogger.Strategy strategyLogger() + public CompactionLogger.Strategy strategyLogger() { return new CompactionLogger.Strategy() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index f943b19..43c81a4 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -62,12 +62,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy int configuredLevelFanoutSize = DEFAULT_LEVEL_FANOUT_SIZE; SizeTieredCompactionStrategyOptions localOptions = new SizeTieredCompactionStrategyOptions(options); if (options != null) - { - if (options.containsKey(SSTABLE_SIZE_OPTION)) - { - configuredMaxSSTableSize = Integer.parseInt(options.get(SSTABLE_SIZE_OPTION)); + { + if (options.containsKey(SSTABLE_SIZE_OPTION)) + { + configuredMaxSSTableSize = Integer.parseInt(options.get(SSTABLE_SIZE_OPTION)); if (!tolerateSstableSize) - { + { if (configuredMaxSSTableSize >= 1000) logger.warn("Max sstable size of {}MB is configured for {}.{}; having a unit of compaction this large is probably a bad idea", configuredMaxSSTableSize, cfs.name, cfs.getColumnFamilyName()); @@ -113,6 +113,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute public AbstractCompactionTask getNextBackgroundTask(int gcBefore) { + Collection<SSTableReader> previousCandidate = null; while (true) { OperationType op; @@ -136,6 +137,16 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy op = OperationType.COMPACTION; } + // Already tried acquiring references without success. It means there is a race with + // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager + if (candidate.sstables.equals(previousCandidate)) + { + logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," + + "unless it happens frequently, in which case it must be reported. Will retry later.", + candidate.sstables); + return null; + } + LifecycleTransaction txn = cfs.getTracker().tryModify(candidate.sstables, OperationType.COMPACTION); if (txn != null) { @@ -143,6 +154,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy newTask.setCompactionType(op); return newTask; } + previousCandidate = candidate.sstables; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 3d118de..ceb3811 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -169,8 +169,28 @@ public class LeveledManifest { logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e); } - generations[0].add(reader); + if (!contains(reader)) + { + generations[0].add(reader); + } + else + { + // An SSTable being added multiple times to this manifest indicates a programming error, but we don't + // throw an AssertionError because this shouldn't break the compaction strategy. Instead we log it + // together with a RuntimeException so the stack is print for troubleshooting if this ever happens. + logger.warn("SSTable {} is already present on leveled manifest and should not be re-added.", reader, new RuntimeException()); + } + } + } + + private boolean contains(SSTableReader reader) + { + for (int i = 0; i < generations.length; i++) + { + if (generations[i].contains(reader)) + return true; } + return false; } public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 8302a9b..0dd134a 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -176,6 +176,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy @SuppressWarnings("resource") public AbstractCompactionTask getNextBackgroundTask(int gcBefore) { + List<SSTableReader> previousCandidate = null; while (true) { List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore); @@ -183,9 +184,20 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy if (hottestBucket.isEmpty()) return null; + // Already tried acquiring references without success. It means there is a race with + // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager + if (hottestBucket.equals(previousCandidate)) + { + logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," + + "unless it happens frequently, in which case it must be reported. Will retry later.", + hottestBucket); + return null; + } + LifecycleTransaction transaction = cfs.getTracker().tryModify(hottestBucket, OperationType.COMPACTION); if (transaction != null) return new CompactionTask(cfs, transaction, gcBefore); + previousCandidate = hottestBucket; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java index 9532cc4..38cef70 100644 --- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java @@ -72,6 +72,7 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute public AbstractCompactionTask getNextBackgroundTask(int gcBefore) { + List<SSTableReader> previousCandidate = null; while (true) { List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore); @@ -79,9 +80,20 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy if (latestBucket.isEmpty()) return null; + // Already tried acquiring references without success. It means there is a race with + // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager + if (latestBucket.equals(previousCandidate)) + { + logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," + + "unless it happens frequently, in which case it must be reported. Will retry later.", + latestBucket); + return null; + } + LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION); if (modifier != null) return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps); + previousCandidate = latestBucket; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java new file mode 100644 index 0000000..481b394 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.schema.CompactionParams; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.FBUtilities; + +public class AbstractCompactionStrategyTest +{ + private static final String KEYSPACE1 = "Keyspace1"; + private static final String LCS_TABLE = "LCS_TABLE"; + private static final String STCS_TABLE = "STCS_TABLE"; + private static final String DTCS_TABLE = "DTCS_TABLE"; + private static final String TWCS_TABLE = "TWCS_TABLE"; + + @BeforeClass + public static void loadData() throws ConfigurationException + { + Map<String, String> stcsOptions = new HashMap<>(); + stcsOptions.put("tombstone_compaction_interval", "1"); + + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, LCS_TABLE) + .compaction(CompactionParams.lcs(Collections.emptyMap())), + SchemaLoader.standardCFMD(KEYSPACE1, STCS_TABLE) + .compaction(CompactionParams.scts(Collections.emptyMap())), + SchemaLoader.standardCFMD(KEYSPACE1, DTCS_TABLE) + .compaction(CompactionParams.create(DateTieredCompactionStrategy.class, Collections.emptyMap())), + SchemaLoader.standardCFMD(KEYSPACE1, TWCS_TABLE) + .compaction(CompactionParams.create(TimeWindowCompactionStrategy.class, Collections.emptyMap()))); + Keyspace.open(KEYSPACE1).getColumnFamilyStore(LCS_TABLE).disableAutoCompaction(); + Keyspace.open(KEYSPACE1).getColumnFamilyStore(STCS_TABLE).disableAutoCompaction(); + Keyspace.open(KEYSPACE1).getColumnFamilyStore(DTCS_TABLE).disableAutoCompaction(); + Keyspace.open(KEYSPACE1).getColumnFamilyStore(TWCS_TABLE).disableAutoCompaction(); + } + + @After + public void tearDown() + { + + Keyspace.open(KEYSPACE1).getColumnFamilyStore(LCS_TABLE).truncateBlocking(); + Keyspace.open(KEYSPACE1).getColumnFamilyStore(STCS_TABLE).truncateBlocking(); + Keyspace.open(KEYSPACE1).getColumnFamilyStore(DTCS_TABLE).truncateBlocking(); + Keyspace.open(KEYSPACE1).getColumnFamilyStore(TWCS_TABLE).truncateBlocking(); + } + + @Test(timeout=30000) + public void testGetNextBackgroundTaskDoesNotBlockLCS() + { + testGetNextBackgroundTaskDoesNotBlock(LCS_TABLE); + } + + @Test(timeout=30000) + public void testGetNextBackgroundTaskDoesNotBlockSTCS() + { + testGetNextBackgroundTaskDoesNotBlock(STCS_TABLE); + } + + @Test(timeout=30000) + public void testGetNextBackgroundTaskDoesNotBlockDTCS() + { + testGetNextBackgroundTaskDoesNotBlock(DTCS_TABLE); + } + + @Test(timeout=30000) + public void testGetNextBackgroundTaskDoesNotBlockTWCS() + { + testGetNextBackgroundTaskDoesNotBlock(TWCS_TABLE); + } + + public void testGetNextBackgroundTaskDoesNotBlock(String table) + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(table); + AbstractCompactionStrategy strategy = cfs.getCompactionStrategyManager().getStrategies().get(1).get(0); + + // Add 4 sstables + for (int i = 1; i <= 4; i++) + { + insertKeyAndFlush(table, i); + } + + // Check they are returned on the next background task + try (LifecycleTransaction txn = strategy.getNextBackgroundTask(FBUtilities.nowInSeconds()).transaction) + { + Assert.assertEquals(cfs.getLiveSSTables(), txn.originals()); + } + + // now remove sstables on the tracker, to simulate a concurrent transaction + cfs.getTracker().removeUnsafe(cfs.getLiveSSTables()); + + // verify the compaction strategy will return null + Assert.assertNull(strategy.getNextBackgroundTask(FBUtilities.nowInSeconds())); + } + + + private static void insertKeyAndFlush(String table, int key) + { + long timestamp = System.currentTimeMillis(); + DecoratedKey dk = Util.dk(String.format("%03d", key)); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(table); + new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey()) + .clustering(String.valueOf(key)) + .add("val", "val") + .build() + .applyUnsafe(); + cfs.forceBlockingFlush(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org