This is an automated email from the ASF dual-hosted git repository. mck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 5e8f7f5 Release StreamingTombstoneHistogramBuilder spool when switching writers 5e8f7f5 is described below commit 5e8f7f591dfec5a61d8eb2e9e977ec29f3a2bbe4 Author: Adam Holmberg <adam.holmb...@datastax.com> AuthorDate: Mon Dec 14 14:42:50 2020 -0600 Release StreamingTombstoneHistogramBuilder spool when switching writers patch by Adam Holmberg; reviewed by Berenguer Blasi, Mick Semb Wever for CASSANDRA-14834 --- CHANGES.txt | 1 + .../cassandra/db/compaction/CompactionTask.java | 6 - .../compaction/writers/CompactionAwareWriter.java | 11 -- .../writers/MajorLeveledCompactionWriter.java | 12 -- .../compaction/writers/MaxSSTableSizeWriter.java | 13 -- .../SplittingSizeTieredCompactionWriter.java | 2 +- .../cassandra/io/sstable/SSTableRewriter.java | 31 ++--- .../cassandra/io/sstable/format/SSTableWriter.java | 5 + .../io/sstable/metadata/MetadataCollector.java | 8 ++ .../StreamingTombstoneHistogramBuilder.java | 31 +++-- .../cassandra/io/sstable/SSTableRewriterTest.java | 139 +++++++++++++++------ 11 files changed, 152 insertions(+), 107 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 1c63268..7699940 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-beta5 + * Release StreamingTombstoneHistogramBuilder spool when switching writers (CASSANDRA-14834) * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318) * Fix client notifications in CQL protocol v5 (CASSANDRA-16353) * Too defensive check when picking sstables for preview repair (CASSANDRA-16284) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 764ad5b..13c9725 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -59,12 +59,6 @@ public class CompactionTask extends AbstractCompactionTask this(cfs, txn, gcBefore, false); } - @Deprecated - public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline, boolean keepOriginals) - { - this(cfs, txn, gcBefore, keepOriginals); - } - public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean keepOriginals) { super(cfs, txn); diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index c1ae9ec..d363dcf 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@ -66,17 +66,6 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa private final List<PartitionPosition> diskBoundaries; private int locationIndex; - @Deprecated - public CompactionAwareWriter(ColumnFamilyStore cfs, - Directories directories, - LifecycleTransaction txn, - Set<SSTableReader> nonExpiredSSTables, - boolean offline, - boolean keepOriginals) - { - this(cfs, directories, txn, nonExpiredSSTables, keepOriginals); - } - public CompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index 2b93eb4..1c53600 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -51,18 +51,6 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, false); } - @Deprecated - public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, - Directories directories, - LifecycleTransaction txn, - Set<SSTableReader> nonExpiredSSTables, - long maxSSTableSize, - boolean offline, - boolean keepOriginals) - { - this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, keepOriginals); - } - @SuppressWarnings("resource") public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, Directories directories, diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index df7eeaf..915f96b 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -48,19 +48,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, false); } - @Deprecated - public MaxSSTableSizeWriter(ColumnFamilyStore cfs, - Directories directories, - LifecycleTransaction txn, - Set<SSTableReader> nonExpiredSSTables, - long maxSSTableSize, - int level, - boolean offline, - boolean keepOriginals) - { - this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, keepOriginals); - } - public MaxSSTableSizeWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 7533f1d..d29061c 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -58,7 +58,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable) { - super(cfs, directories, txn, nonExpiredSSTables, false, false); + super(cfs, directories, txn, nonExpiredSSTables, false); this.allSSTables = txn.originals(); totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()); double[] potentialRatios = new double[20]; diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index fb3aa2d..a3d5ae9 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@ -66,6 +66,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme private final List<SSTableWriter> writers = new ArrayList<>(); private final boolean keepOriginals; // true if we do not want to obsolete the originals + private final boolean eagerWriterMetaRelease; // true if the writer metadata should be released when switch is called private SSTableWriter writer; private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>(); @@ -74,44 +75,33 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme private boolean throwEarly, throwLate; @Deprecated - public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, boolean isOffline) - { - this(transaction, maxAge, isOffline, true); - } - @Deprecated - public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, boolean isOffline, boolean shouldOpenEarly) + public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, long preemptiveOpenInterval, boolean keepOriginals) { - this(transaction, maxAge, calculateOpenInterval(shouldOpenEarly), false); + this(transaction, maxAge, preemptiveOpenInterval, keepOriginals, false); } - @VisibleForTesting - public SSTableRewriter(ILifecycleTransaction transaction, long maxAge, long preemptiveOpenInterval, boolean keepOriginals) + SSTableRewriter(ILifecycleTransaction transaction, long maxAge, long preemptiveOpenInterval, boolean keepOriginals, boolean eagerWriterMetaRelease) { this.transaction = transaction; this.maxAge = maxAge; - this.keepOriginals = keepOriginals; this.preemptiveOpenInterval = preemptiveOpenInterval; - } - - @Deprecated - public static SSTableRewriter constructKeepingOriginals(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge, boolean isOffline) - { - return constructKeepingOriginals(transaction, keepOriginals, maxAge); + this.keepOriginals = keepOriginals; + this.eagerWriterMetaRelease = eagerWriterMetaRelease; } public static SSTableRewriter constructKeepingOriginals(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge) { - return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(true), keepOriginals); + return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(true), keepOriginals, true); } public static SSTableRewriter constructWithoutEarlyOpening(ILifecycleTransaction transaction, boolean keepOriginals, long maxAge) { - return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(false), keepOriginals); + return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(false), keepOriginals, true); } public static SSTableRewriter construct(ColumnFamilyStore cfs, ILifecycleTransaction transaction, boolean keepOriginals, long maxAge) { - return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(cfs.supportsEarlyOpen()), keepOriginals); + return new SSTableRewriter(transaction, maxAge, calculateOpenInterval(cfs.supportsEarlyOpen()), keepOriginals, true); } private static long calculateOpenInterval(boolean shouldOpenEarly) @@ -303,6 +293,9 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme if (newWriter != null) writers.add(newWriter.setMaxDataAge(maxAge)); + if (eagerWriterMetaRelease && writer != null) + writer.releaseMetadataOverhead(); + if (writer == null || writer.getFilePointer() == 0) { if (writer != null) diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index f54bc03..1dbfcdb 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -325,6 +325,11 @@ public abstract class SSTableWriter extends SSTable implements Transactional return (StatsMetadata) finalizeMetadata().get(MetadataType.STATS); } + public void releaseMetadataOverhead() + { + metadataCollector.release(); + } + public static void rename(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components) { for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY))) diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index 65d534e..0d49ea1 100755 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -272,6 +272,14 @@ public class MetadataCollector implements PartitionStatisticsCollector return components; } + /** + * Release large memory objects while keeping metrics intact + */ + public void release() + { + estimatedTombstoneDropTime.releaseBuffers(); + } + private static List<ByteBuffer> makeList(ByteBuffer[] values) { // In most case, l will be the same size than values, but it's possible for it to be smaller diff --git a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java index eda88bc..76630ec 100755 --- a/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java +++ b/src/java/org/apache/cassandra/utils/streamhist/StreamingTombstoneHistogramBuilder.java @@ -40,12 +40,12 @@ import org.apache.cassandra.db.rows.Cell; * <ol> * <li>If point <i>p</i> is already exists in collection, add <i>m</i> to recorded value of point <i>p</i> </li> * <li>If there is no point <i>p</i> in the collection, add point <i>p</i> with weight <i>m</i> </li> - * <li>If point was added and collection size became lorger than maxBinSize:</li> + * <li>If point was added and collection size became larger than maxBinSize:</li> * </ol> * * <ol type="a"> * <li>Find nearest points <i>p1</i> and <i>p2</i> in the collection </li> - * <li>Replace theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li> + * <li>Replace these two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2)</i></li> * </ol> * * <p> @@ -54,7 +54,7 @@ import org.apache.cassandra.db.rows.Cell; * <li>Spool: big map that saves from excessively merging of small bin. This map can contains up to maxSpoolSize points and accumulate weight from same points. * For example, if spoolSize=100, binSize=10 and there are only 50 different points. it will be only 40 merges regardless how many points will be added.</li> * <li>Spool is organized as open-addressing primitive hash map where odd elements are points and event elements are values. - * Spool can not resize => when number of collisions became bigger than threashold or size became large that <i>array_size/2</i> Spool is drained to bin</li> + * Spool can not resize => when number of collisions became bigger than threshold or size became large that <i>array_size/2</i> Spool is drained to bin</li> * <li>Bin is organized as sorted arrays. It reduces garbage collection pressure and allows to find elements in log(binSize) time via binary search</li> * <li>To use existing Arrays.binarySearch <i></>{point, values}</i> in bin pairs is packed in one long</li> * </ol> @@ -69,7 +69,7 @@ public class StreamingTombstoneHistogramBuilder private final DataHolder bin; // Keep a second, larger buffer to spool data in, before finalizing it into `bin` - private final Spool spool; + private Spool spool; // voluntarily give up resolution for speed private final int roundSeconds; @@ -98,6 +98,7 @@ public class StreamingTombstoneHistogramBuilder */ public void update(int point, int value) { + assert spool != null: "update is being called after releaseBuffers. This could be functionally okay, but this assertion is a canary to alert about unintended use before it is necessary."; point = ceilKey(point, roundSeconds); if (spool.capacity > 0) @@ -120,8 +121,22 @@ public class StreamingTombstoneHistogramBuilder */ public void flushHistogram() { - spool.forEach(this::flushValue); - spool.clear(); + Spool spool = this.spool; + if (spool != null) + { + spool.forEach(this::flushValue); + spool.clear(); + } + } + + /** + * Release inner spool buffers. Histogram remains readable and writable, but with lesser performance. + * Not intended for use before finalization. + */ + public void releaseBuffers() + { + flushHistogram(); + spool = null; } private void flushValue(int key, int spoolValue) @@ -135,7 +150,7 @@ public class StreamingTombstoneHistogramBuilder } /** - * Creates a 'finished' snapshot of the current state of the historgram, but leaves this builder instance + * Creates a 'finished' snapshot of the current state of the histogram, but leaves this builder instance * open for subsequent additions to the histograms. Basically, this allows us to have some degree of sanity * wrt sstable early open. */ @@ -220,7 +235,7 @@ public class StreamingTombstoneHistogramBuilder /** * Finds nearest points <i>p1</i> and <i>p2</i> in the collection - * Replaces theese two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2) + * Replaces these two points with one weighted point <i>p3 = (p1*m1+p2*m2)/(p1+p2) */ @VisibleForTesting void mergeNearestPoints() diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 7c47c8b..1895653 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -19,24 +19,21 @@ package org.apache.cassandra.io.sstable; import java.io.File; -import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Test; import org.apache.cassandra.Util; import org.apache.cassandra.UpdateBuilder; import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.SerializationHeader; @@ -49,25 +46,27 @@ import org.apache.cassandra.db.compaction.CompactionIterator; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.compaction.SSTableSplitter; import org.apache.cassandra.db.partitions.ImmutableBTreePartition; +import org.apache.cassandra.dht.ByteOrderedPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.metrics.StorageMetrics; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class SSTableRewriterTest extends SSTableWriterTestBase { @Test - public void basicTest() throws InterruptedException + public void basicTest() { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -106,7 +105,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase truncate(cfs); } @Test - public void basicTest2() throws InterruptedException + public void basicTest2() { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -120,7 +119,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase int nowInSec = FBUtilities.nowInSeconds(); try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables); LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); - SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false); + SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false, true); CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec)); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID())) { @@ -138,7 +137,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } @Test - public void getPositionsTest() throws InterruptedException + public void getPositionsTest() { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -153,7 +152,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase boolean checked = false; try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables); LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); - SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false); + SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false, true); CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec)); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID())) { @@ -194,7 +193,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } @Test - public void testNumberOfFilesAndSizes() throws Exception + public void testNumberOfFilesAndSizes() { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -211,7 +210,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0); LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false); + SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); @@ -250,7 +249,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } @Test - public void testNumberOfFiles_dont_clean_readers() throws Exception + public void testNumberOfFiles_dont_clean_readers() { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -266,7 +265,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0); LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false); + SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); @@ -401,7 +400,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase LifecycleTransaction txn); } - private void testNumberOfFiles_abort(RewriterTest test) throws Exception + private void testNumberOfFiles_abort(RewriterTest test) { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -417,7 +416,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0); LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false)) + SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true)) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); test.run(scanner, controller, s, cfs, rewriter, txn); @@ -434,7 +433,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } @Test - public void testNumberOfFiles_finish_empty_new_writer() throws Exception + public void testNumberOfFiles_finish_empty_new_writer() { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -449,7 +448,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0); LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false); + SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); @@ -479,7 +478,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } @Test - public void testNumberOfFiles_truncate() throws Exception + public void testNumberOfFiles_truncate() { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -495,7 +494,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0); LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false); + SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 10000000, false, true); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); @@ -510,7 +509,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } } - sstables = rewriter.finish(); + rewriter.finish(); } LifecycleTransaction.waitForDeletions(); @@ -519,7 +518,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } @Test - public void testSmallFiles() throws Exception + public void testSmallFiles() { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -535,7 +534,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase try (ISSTableScanner scanner = s.getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0); LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false); + SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false, true); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())) { rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn)); @@ -561,7 +560,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } @Test - public void testSSTableSplit() throws InterruptedException + public void testSSTableSplit() { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -603,12 +602,12 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } @Test - public void testAbort2() throws Exception + public void testAbort2() { testAbortHelper(true, false); } - private void testAbortHelper(boolean earlyException, boolean offline) throws Exception + private void testAbortHelper(boolean earlyException, boolean offline) { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -621,7 +620,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase CompactionController controller = new CompactionController(cfs, compacting, 0); LifecycleTransaction txn = offline ? LifecycleTransaction.offline(OperationType.UNKNOWN, compacting) : cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(txn, 100, 10000000, false); + SSTableRewriter rewriter = new SSTableRewriter(txn, 100, 10000000, false, true); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()) ) { @@ -711,7 +710,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase try (ISSTableScanner scanner = compacting.iterator().next().getScanner(); CompactionController controller = new CompactionController(cfs, compacting, 0); LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1, false); + SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1, false, true); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()) ) { @@ -735,7 +734,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } @Test - public void testCanonicalView() throws Exception + public void testCanonicalView() { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -749,7 +748,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase try (ISSTableScanner scanner = sstables.iterator().next().getScanner(); CompactionController controller = new CompactionController(cfs, sstables, 0); LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); - SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false); + SSTableRewriter writer = new SSTableRewriter(txn, 1000, 10000000, false, true); CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()) ) { @@ -775,11 +774,9 @@ public class SSTableRewriterTest extends SSTableWriterTestBase /** * emulates anticompaction - writing from one source sstable to two new sstables - * - * @throws IOException */ @Test - public void testTwoWriters() throws IOException + public void testTwoWriters() { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); @@ -853,6 +850,74 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } + /** + * tests SSTableRewriter ctor arg controlling whether writers metadata buffers are released. + * Verifies that writers trip an assert when updated after cleared on switch + * + * CASSANDRA-14834 + */ + @Test + public void testWriterClearing() + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + File dir = cfs.getDirectories().getDirectoryForNewSSTables(); + + // Can't update a writer that is eagerly cleared on switch + boolean eagerWriterMetaRelease = true; + try (LifecycleTransaction txn = cfs.getTracker().tryModify(new HashSet<>(), OperationType.UNKNOWN); + SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false, eagerWriterMetaRelease) + ) + { + SSTableWriter firstWriter = getWriter(cfs, dir, txn); + rewriter.switchWriter(firstWriter); + rewriter.switchWriter(getWriter(cfs, dir, txn)); + try + { + UnfilteredRowIterator uri = mock(UnfilteredRowIterator.class); + when(uri.partitionLevelDeletion()).thenReturn(new DeletionTime(0,0)); + when(uri.partitionKey()).thenReturn(bopKeyFromInt(0)); + // should not be able to append after buffer release on switch + firstWriter.append(uri); + fail("Expected AssertionError was not thrown."); + } + catch(AssertionError ae) { + if (!ae.getMessage().contains("update is being called after releaseBuffers")) + throw ae; + } + } + + // Can update a writer that is not eagerly cleared on switch + eagerWriterMetaRelease = false; + try (LifecycleTransaction txn = cfs.getTracker().tryModify(new HashSet<>(), OperationType.UNKNOWN); + SSTableRewriter rewriter = new SSTableRewriter(txn, 1000, 1000000, false, eagerWriterMetaRelease) + ) + { + SSTableWriter firstWriter = getWriter(cfs, dir, txn); + rewriter.switchWriter(firstWriter); + + // At least one write so it's not aborted when switched out. + UnfilteredRowIterator uri = mock(UnfilteredRowIterator.class); + when(uri.partitionLevelDeletion()).thenReturn(new DeletionTime(0,0)); + when(uri.partitionKey()).thenReturn(bopKeyFromInt(0)); + rewriter.append(uri); + + rewriter.switchWriter(getWriter(cfs, dir, txn)); + + // should be able to append after switch, and assert is not tripped + when(uri.partitionKey()).thenReturn(bopKeyFromInt(1)); + firstWriter.append(uri); + } + } + + static DecoratedKey bopKeyFromInt(int i) + { + ByteBuffer bb = ByteBuffer.allocate(4); + bb.putInt(i); + bb.rewind(); + return ByteOrderedPartitioner.instance.decorateKey(bb); + } + private void validateKeys(Keyspace ks) { for (int i = 0; i < 100; i++) @@ -865,10 +930,10 @@ public class SSTableRewriterTest extends SSTableWriterTestBase public static SSTableReader writeFile(ColumnFamilyStore cfs, int count) { - return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100, 1000), null); + return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100), null); } - public static Set<SSTableReader> writeFiles(ColumnFamilyStore cfs, int fileCount, int partitionCount, int cellCount, int cellSize) + public static Set<SSTableReader> writeFiles(ColumnFamilyStore cfs, int fileCount, int partitionCount, int cellCount) { int i = 0; Set<SSTableReader> result = new LinkedHashSet<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org