This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new b58a5c8 LeveledCompactionStrategy disk space check improvements b58a5c8 is described below commit b58a5c86e89e10ad4d39756c5314a756eb18204d Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Thu Jan 20 14:18:46 2022 +0100 LeveledCompactionStrategy disk space check improvements Patch by marcuse; reviewed by Caleb Rackliffe for CASSANDRA-17272 --- CHANGES.txt | 2 + .../cassandra/db/compaction/CompactionTask.java | 5 +- .../db/compaction/LeveledCompactionTask.java | 45 +++++- .../compaction/writers/CompactionAwareWriter.java | 7 +- .../writers/MajorLeveledCompactionWriter.java | 8 +- .../compaction/writers/MaxSSTableSizeWriter.java | 8 +- .../SplittingSizeTieredCompactionWriter.java | 9 +- test/unit/org/apache/cassandra/MockSchema.java | 3 +- .../compaction/LeveledCompactionStrategyTest.java | 161 ++++++++++++++++++++- .../{ => writers}/CompactionAwareWriterTest.java | 48 +++++- 10 files changed, 278 insertions(+), 18 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 527450d..4d07a3d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,8 @@ 3.0.27 + * LeveledCompactionStrategy disk space check improvements (CASSANDRA-17272) * Lazy transaction log replica creation allows incorrect replica content divergence during anticompaction (CASSANDRA-17273) + 3.0.26 * Fix conversion from megabits to bytes in streaming rate limiter (CASSANDRA-17243) * Upgrade logback to 1.2.9 (CASSANDRA-17204) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index d29d5e6..d023cef 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -95,9 +95,10 @@ public class CompactionTask extends AbstractCompactionTask if (partialCompactionsAcceptable() && transaction.originals().size() > 1) { // Try again w/o the largest one. - logger.warn("insufficient space to compact all requested files. {}MB required, {}", + logger.warn("insufficient space to compact all requested files. {}MB required, {} for compaction {}", (float) expectedSize / 1024 / 1024, - StringUtils.join(transaction.originals(), ", ")); + StringUtils.join(transaction.originals(), ", "), + transaction.opId()); // Note that we have removed files that are still marked as compacting. // This suboptimal but ok since the caller will unmark all the sstables at the end. SSTableReader removedSSTable = cfs.getMaxSizeFile(transaction.originals()); diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java index f8c3521..20ff21c 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.compaction; import java.util.Set; +import java.util.stream.Collectors; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; @@ -55,11 +56,53 @@ public class LeveledCompactionTask extends CompactionTask @Override protected boolean partialCompactionsAcceptable() { - return level == 0; + throw new UnsupportedOperationException("This is now handled in reduceScopeForLimitedSpace"); } protected int getLevel() { return level; } + + @Override + public boolean reduceScopeForLimitedSpace(long expectedSize) + { + if (transaction.originals().size() > 1 && level <= 1) + { + // Try again w/o the largest one. + logger.warn("insufficient space to do L0 -> L{} compaction. {}MiB required, {} for compaction {}", + level, + (float) expectedSize / 1024 / 1024, + transaction.originals() + .stream() + .map(sstable -> String.format("%s (level=%s, size=%s)", sstable, sstable.getSSTableLevel(), sstable.onDiskLength())) + .collect(Collectors.joining(",")), + transaction.opId()); + // Note that we have removed files that are still marked as compacting. + // This suboptimal but ok since the caller will unmark all the sstables at the end. + int l0SSTableCount = 0; + SSTableReader largestL0SSTable = null; + for (SSTableReader sstable : transaction.originals()) + { + if (sstable.getSSTableLevel() == 0) + { + l0SSTableCount++; + if (largestL0SSTable == null || sstable.onDiskLength() > largestL0SSTable.onDiskLength()) + largestL0SSTable = sstable; + } + } + // no point doing a L0 -> L{0,1} compaction if we have cancelled all L0 sstables + if (largestL0SSTable != null && l0SSTableCount > 1) + { + logger.info("Removing {} (level={}, size={}) from compaction {}", + largestL0SSTable, + largestL0SSTable.getSSTableLevel(), + largestL0SSTable.onDiskLength(), + transaction.opId()); + transaction.cancel(largestL0SSTable); + return true; + } + } + return false; + } } diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index d33d72c..1ceed1c 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@ -125,7 +125,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa protected void maybeSwitchWriter(DecoratedKey key) { if (!isInitialized) - switchCompactionLocation(getDirectories().getWriteableLocation(cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()))); + switchCompactionLocation(getDirectories().getWriteableLocation(getExpectedWriteSize())); isInitialized = true; } @@ -156,4 +156,9 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa return directory; } + + protected long getExpectedWriteSize() + { + return cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()); + } } diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index 6d191f8..3eee398 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -88,7 +88,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter } averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1)); - switchCompactionLocation(getWriteDirectory(expectedWriteSize)); + switchCompactionLocation(getWriteDirectory(getExpectedWriteSize())); partitionsWritten = 0; sstablesWritten++; } @@ -109,4 +109,10 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter txn); sstableWriter.switchWriter(writer); } + + @Override + protected long getExpectedWriteSize() + { + return expectedWriteSize; + } } diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index b206498..d76381a 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -87,7 +87,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter { RowIndexEntry rie = sstableWriter.append(partition); if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize) - switchCompactionLocation(getWriteDirectory(expectedWriteSize)); + switchCompactionLocation(getWriteDirectory(getExpectedWriteSize())); return rie != null; } @@ -105,4 +105,10 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter sstableWriter.switchWriter(writer); } + + @Override + protected long getExpectedWriteSize() + { + return expectedWriteSize; + } } diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 796391c..77672d8 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -95,8 +95,8 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter if (sstableWriter.currentWriter().getOnDiskFilePointer() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect { currentRatioIndex++; - currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]); - switchCompactionLocation(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex]))); + currentBytesToWrite = getExpectedWriteSize(); + switchCompactionLocation(getWriteDirectory(currentBytesToWrite)); } return rie != null; } @@ -116,4 +116,9 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter sstableWriter.switchWriter(writer); } + + protected long getExpectedWriteSize() + { + return Math.round(totalSize * ratios[currentRatioIndex]); + } } diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java index a406290..1b47fc2 100644 --- a/test/unit/org/apache/cassandra/MockSchema.java +++ b/test/unit/org/apache/cassandra/MockSchema.java @@ -62,7 +62,7 @@ public class MockSchema public static final Keyspace ks = Keyspace.mockKS(KeyspaceMetadata.create("mockks", KeyspaceParams.simpleTransient(1))); public static final IndexSummary indexSummary; - private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), RandomAccessReader.DEFAULT_BUFFER_SIZE, 0); + public static final File tempFile = temp("mocksegmentedfile"); public static Memtable memtable(ColumnFamilyStore cfs) { @@ -102,6 +102,7 @@ public class MockSchema { } } + SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(tempFile), RandomAccessReader.DEFAULT_BUFFER_SIZE, size); if (size > 0) { try diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 2cda2e8..5bbc931 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -17,31 +17,40 @@ */ package org.apache.cassandra.db.compaction; +import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; -import junit.framework.Assert; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.MockSchema; import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; import org.apache.cassandra.UpdateBuilder; +import org.apache.cassandra.Util; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; @@ -55,9 +64,12 @@ import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @RunWith(OrderedJUnit4ClassRunner.class) @@ -367,4 +379,147 @@ public class LeveledCompactionStrategyTest assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2)); assertFalse(repaired.manifest.getLevel(1).contains(sstable2)); } + + @Test + public void testReduceScopeL0L1() throws IOException + { + ColumnFamilyStore cfs = MockSchema.newCFS(); + Map<String, String> localOptions = new HashMap<>(); + localOptions.put("class", "LeveledCompactionStrategy"); + localOptions.put("sstable_size_in_mb", "1"); + cfs.setCompactionParameters(localOptions); + List<SSTableReader> l1sstables = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { + SSTableReader l1sstable = MockSchema.sstable(i, 1 * 1024 * 1024, cfs); + l1sstable.descriptor.getMetadataSerializer().mutateLevel(l1sstable.descriptor, 1); + l1sstable.reloadSSTableMetadata(); + l1sstables.add(l1sstable); + } + List<SSTableReader> l0sstables = new ArrayList<>(); + for (int i = 10; i < 20; i++) + l0sstables.add(MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs)); + + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, Iterables.concat(l0sstables, l1sstables))) + { + CompactionTask task = new LeveledCompactionTask(cfs, txn, 1, 0, 1024*1024, false); + SSTableReader lastRemoved = null; + boolean removed = true; + for (int i = 0; i < l0sstables.size(); i++) + { + Set<SSTableReader> before = new HashSet<>(txn.originals()); + removed = task.reduceScopeForLimitedSpace(0); + SSTableReader removedSSTable = Sets.difference(before, txn.originals()).stream().findFirst().orElse(null); + if (removed) + { + assertNotNull(removedSSTable); + assertTrue(lastRemoved == null || removedSSTable.onDiskLength() < lastRemoved.onDiskLength()); + assertEquals(0, removedSSTable.getSSTableLevel()); + Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals()); + Set<SSTableReader> l1after = sstables.right; + + assertEquals(l1after, new HashSet<>(l1sstables)); // we don't touch L1 + assertEquals(before.size() - 1, txn.originals().size()); + lastRemoved = removedSSTable; + } + else + { + assertNull(removedSSTable); + Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals()); + Set<SSTableReader> l0after = sstables.left; + Set<SSTableReader> l1after = sstables.right; + assertEquals(l1after, new HashSet<>(l1sstables)); // we don't touch L1 + assertEquals(1, l0after.size()); // and we stop reducing once there is a single sstable left + } + } + assertFalse(removed); + } + } + + @Test + public void testReduceScopeL0() + { + + List<SSTableReader> l0sstables = new ArrayList<>(); + for (int i = 10; i < 20; i++) + l0sstables.add(MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs)); + + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, l0sstables)) + { + CompactionTask task = new LeveledCompactionTask(cfs, txn, 0, 0, 1024*1024, false); + + SSTableReader lastRemoved = null; + boolean removed = true; + for (int i = 0; i < l0sstables.size(); i++) + { + Set<SSTableReader> before = new HashSet<>(txn.originals()); + removed = task.reduceScopeForLimitedSpace(0); + SSTableReader removedSSTable = Sets.difference(before, txn.originals()).stream().findFirst().orElse(null); + if (removed) + { + assertNotNull(removedSSTable); + assertTrue(lastRemoved == null || removedSSTable.onDiskLength() < lastRemoved.onDiskLength()); + assertEquals(0, removedSSTable.getSSTableLevel()); + assertEquals(before.size() - 1, txn.originals().size()); + lastRemoved = removedSSTable; + } + else + { + assertNull(removedSSTable); + Pair<Set<SSTableReader>, Set<SSTableReader>> sstables = groupByLevel(txn.originals()); + Set<SSTableReader> l0after = sstables.left; + assertEquals(1, l0after.size()); // and we stop reducing once there is a single sstable left + } + } + assertFalse(removed); + } + } + + @Test + public void testNoHighLevelReduction() throws IOException + { + List<SSTableReader> sstables = new ArrayList<>(); + int i = 1; + for (; i < 5; i++) + { + SSTableReader sstable = MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs); + sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1); + sstable.reloadSSTableMetadata(); + sstables.add(sstable); + } + for (; i < 10; i++) + { + SSTableReader sstable = MockSchema.sstable(i, (i + 1) * 1024 * 1024, cfs); + sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 2); + sstable.reloadSSTableMetadata(); + sstables.add(sstable); + } + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, sstables)) + { + CompactionTask task = new LeveledCompactionTask(cfs, txn, 0, 0, 1024 * 1024, false); + assertFalse(task.reduceScopeForLimitedSpace(0)); + assertEquals(new HashSet<>(sstables), txn.originals()); + } + } + + private Pair<Set<SSTableReader>, Set<SSTableReader>> groupByLevel(Iterable<SSTableReader> sstables) + { + Set<SSTableReader> l1after = new HashSet<>(); + Set<SSTableReader> l0after = new HashSet<>(); + for (SSTableReader kept : sstables) + { + switch (kept.getSSTableLevel()) + { + case 0: + l0after.add(kept); + break; + case 1: + l1after.add(kept); + break; + default: + throw new RuntimeException("only l0 & l1 sstables"); + } + } + return Pair.create(l0after, l1after); + } } diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/writers/CompactionAwareWriterTest.java similarity index 84% rename from test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java rename to test/unit/org/apache/cassandra/db/compaction/writers/CompactionAwareWriterTest.java index 68936f5..c25a7af 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/writers/CompactionAwareWriterTest.java @@ -15,22 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.cassandra.db.compaction; +package org.apache.cassandra.db.compaction.writers; +import java.io.File; import java.nio.ByteBuffer; import java.util.*; import com.google.common.primitives.Longs; import org.junit.*; +import org.apache.cassandra.MockSchema; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; -import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter; -import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter; -import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter; -import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter; +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; +import org.apache.cassandra.db.compaction.CompactionController; +import org.apache.cassandra.db.compaction.CompactionIterator; +import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.FBUtilities; @@ -165,6 +166,41 @@ public class CompactionAwareWriterTest extends CQLTester cfs.truncateBlocking(); } + @Test + public void testMultiDatadirCheck() + { + createTable("create table %s (id int primary key)"); + Directories.DataDirectory [] dataDirs = new Directories.DataDirectory[] { + new MockDataDirectory(new File("/tmp/1")), + new MockDataDirectory(new File("/tmp/2")), + new MockDataDirectory(new File("/tmp/3")), + new MockDataDirectory(new File("/tmp/4")), + new MockDataDirectory(new File("/tmp/5")) + }; + Set<SSTableReader> sstables = new HashSet<>(); + for (int i = 0; i < 100; i++) + sstables.add(MockSchema.sstable(i, 1000, getCurrentColumnFamilyStore())); + + Directories dirs = new Directories(getCurrentColumnFamilyStore().metadata, dataDirs); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.COMPACTION, sstables); + CompactionAwareWriter writer = new MaxSSTableSizeWriter(getCurrentColumnFamilyStore(), dirs, txn, sstables, 2000, 1); + // init case + writer.maybeSwitchWriter(null); + } + + private static class MockDataDirectory extends Directories.DataDirectory + { + public MockDataDirectory(File location) + { + super(location); + } + + public long getAvailableSpace() + { + return 5000; + } + } + private int compact(ColumnFamilyStore cfs, LifecycleTransaction txn, CompactionAwareWriter writer) { assert txn.originals().size() == 1; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org