Merge branch 'cassandra-2.1' into cassandra-2.2 Conflicts: CHANGES.txt src/java/org/apache/cassandra/db/compaction/CompactionManager.java src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/180130a8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/180130a8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/180130a8 Branch: refs/heads/cassandra-2.2 Commit: 180130a8e59b2848ad843d74c09fabfa5e82eab1 Parents: 7d06762 eaeabff Author: Marcus Eriksson <marc...@apache.org> Authored: Fri May 29 09:57:54 2015 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Fri May 29 09:57:54 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 4 +- .../cassandra/io/sstable/SSTableRewriter.java | 41 +++++----- .../unit/org/apache/cassandra/db/ScrubTest.java | 6 +- .../io/sstable/SSTableRewriterTest.java | 82 ++++++++++++++------ 5 files changed, 88 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 618f063,0e759b7..262f8c8 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,16 -1,5 +1,17 @@@ -2.1.6 +2.2 + * Disallow frozen<> types in function arguments and return types for + clarity (CASSANDRA-9411) + * Static Analysis to warn on unsafe use of Autocloseable instances (CASSANDRA-9431) + * Update commitlog archiving examples now that commitlog segments are + not recycled (CASSANDRA-9350) + * Extend Transactional API to sstable lifecycle management (CASSANDRA-8568) + * (cqlsh) Add support for native protocol 4 (CASSANDRA-9399) + * Ensure that UDF and UDAs are keyspace-isolated (CASSANDRA-9409) + * Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429) + * Add ability to stop compaction by ID (CASSANDRA-7207) + * Let CassandraVersion handle SNAPSHOT version (CASSANDRA-9438) +Merged from 2.1: + * Avoid getting unreadable keys during anticompaction (CASSANDRA-9508) * (cqlsh) Better float precision by default (CASSANDRA-9224) * Improve estimated row count (CASSANDRA-9107) * Optimize range tombstone memory footprint (CASSANDRA-8603) http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 26dab7c,2f3f7df..a2783da --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -1187,90 -1060,68 +1187,90 @@@ public class CompactionManager implemen if (!new File(sstable.getFilename()).exists()) { logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable); + i.remove(); continue; } + if (groupMaxDataAge < sstable.maxDataAge) + groupMaxDataAge = sstable.maxDataAge; + } - logger.info("Anticompacting {}", sstable); - Set<SSTableReader> sstableAsSet = new HashSet<>(); - sstableAsSet.add(sstable); + if (anticompactionGroup.originals().size() == 0) + { + logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available"); + return 0; + } - File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); - SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false); - SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false); + logger.info("Anticompacting {}", anticompactionGroup); + Set<SSTableReader> sstableAsSet = anticompactionGroup.originals(); + + File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); + long repairedKeyCount = 0; + long unrepairedKeyCount = 0; + AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); - try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false); - SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false); ++ try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false); ++ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false); + AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals()); + CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs))) + { + int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet))); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(new HashSet<>(Collections.singleton(sstable))); - CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs))) - { - int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)sstable.estimatedKeys()); - repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable)); - unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable)); + repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet)); + unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet)); - CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller); - Iterator<AbstractCompactedRow> iter = ci.iterator(); - metrics.beginCompaction(ci); - try + CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()); + metrics.beginCompaction(ci); + try + { + @SuppressWarnings("resource") + CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); + while (iter.hasNext()) { - while (iter.hasNext()) + @SuppressWarnings("resource") + AbstractCompactedRow row = iter.next(); + // if current range from sstable is repaired, save it into the new repaired sstable + if (Range.isInRanges(row.key.getToken(), ranges)) { - AbstractCompactedRow row = iter.next(); - // if current range from sstable is repaired, save it into the new repaired sstable - if (Range.isInRanges(row.key.getToken(), ranges)) - { - repairedSSTableWriter.append(row); - repairedKeyCount++; - } - // otherwise save into the new 'non-repaired' table - else - { - unRepairedSSTableWriter.append(row); - unrepairedKeyCount++; - } + repairedSSTableWriter.append(row); + repairedKeyCount++; + } + // otherwise save into the new 'non-repaired' table + else + { + unRepairedSSTableWriter.append(row); + unrepairedKeyCount++; } } - finally - { - metrics.finishCompaction(ci); - } - anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt)); - anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE)); - cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION); } - catch (Throwable e) + finally { - JVMStabilityInspector.inspectThrowable(e); - logger.error("Error anticompacting " + sstable, e); - repairedSSTableWriter.abort(); - unRepairedSSTableWriter.abort(); + metrics.finishCompaction(ci); } - } - String format = "Repaired {} keys of {} for {}/{}"; - logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName()); - String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s)."; - logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size()); - return anticompactedSSTables; + List<SSTableReader> anticompactedSSTables = new ArrayList<>(); + // since both writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method, + // as on the second finish() we would prepareToCommit() on a Transaction that has already been committed, which is forbidden by the API + // (since it indicates misuse). We call permitRedundantTransitions so that calls that transition to a state already occupied are permitted. + anticompactionGroup.permitRedundantTransitions(); + repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit(); + unRepairedSSTableWriter.prepareToCommit(); + anticompactedSSTables.addAll(repairedSSTableWriter.finished()); + anticompactedSSTables.addAll(unRepairedSSTableWriter.finished()); + repairedSSTableWriter.commit(); + unRepairedSSTableWriter.commit(); + + logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount, + repairedKeyCount + unrepairedKeyCount, + cfs.keyspace.getName(), + cfs.getColumnFamilyName(), + anticompactionGroup); + return anticompactedSSTables.size(); + } + catch (Throwable e) + { + JVMStabilityInspector.inspectThrowable(e); + logger.error("Error anticompacting " + anticompactionGroup, e); + } + return 0; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index 8029075,824e58b..011c7d9 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@@ -47,56 -49,68 +47,59 @@@ import org.apache.cassandra.utils.concu * but leave any hard-links in place for the readers we opened to cleanup when they're finished as we would had we finished * successfully. */ -public class SSTableRewriter +public class SSTableRewriter extends Transactional.AbstractTransactional implements Transactional { - private static long preemptiveOpenInterval; - static - { - long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20); - if (interval < 0) - interval = Long.MAX_VALUE; - preemptiveOpenInterval = interval; - } - - @VisibleForTesting - public static void overrideOpenInterval(long size) - { - preemptiveOpenInterval = size; - } - private final DataTracker dataTracker; + @VisibleForTesting - public static long getOpenInterval() - { - return preemptiveOpenInterval; - } ++ public static boolean disableEarlyOpeningForTests = false; + private final ColumnFamilyStore cfs; - + private final long preemptiveOpenInterval; private final long maxAge; - private final List<SSTableReader> finished = new ArrayList<>(); - private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced) - private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting + private long repairedAt = -1; + // the set of final readers we will expose on commit + private final LifecycleTransaction transaction; // the readers we are rewriting (updated as they are replaced) + private final List<SSTableReader> preparedForCommit = new ArrayList<>(); private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting - private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at - private final List<SSTableReader> finishedReaders = new ArrayList<>(); - private final Queue<Finished> finishedEarly = new ArrayDeque<>(); - // as writers are closed from finishedEarly, their last readers are moved - // into discard, so that abort can cleanup after us safely - private final List<SSTableReader> discard = new ArrayList<>(); - private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker) + private final List<SSTableWriter> writers = new ArrayList<>(); + private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of Tracker) private SSTableWriter writer; private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>(); - private State state = State.WORKING; - private static enum State - { - WORKING, FINISHED, ABORTED - } + // for testing (TODO: remove when have byteman setup) + private boolean throwEarly, throwLate; - public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline) + public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, long maxAge, boolean isOffline) { - this(cfs, rewriting, maxAge, isOffline, true); ++ this(cfs, transaction, maxAge, isOffline, true); + } + - public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline, boolean shouldOpenEarly) ++ public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, long maxAge, boolean isOffline, boolean shouldOpenEarly) + { - this(cfs, rewriting, maxAge, isOffline, calculateOpenInterval(shouldOpenEarly)); ++ this(cfs, transaction, maxAge, isOffline, calculateOpenInterval(shouldOpenEarly)); + } + + @VisibleForTesting - public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline, long preemptiveOpenInterval) ++ public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, long maxAge, boolean isOffline, long preemptiveOpenInterval) + { - this.rewriting = rewriting; - for (SSTableReader sstable : rewriting) - { - originalStarts.put(sstable.descriptor, sstable.first); + this.transaction = transaction; + for (SSTableReader sstable : this.transaction.originals()) fileDescriptors.put(sstable.descriptor, CLibrary.getfd(sstable.getFilename())); - } - this.dataTracker = cfs.getDataTracker(); this.cfs = cfs; this.maxAge = maxAge; this.isOffline = isOffline; + this.preemptiveOpenInterval = preemptiveOpenInterval; + } + + private static long calculateOpenInterval(boolean shouldOpenEarly) + { + long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20); - if (!shouldOpenEarly || interval < 0) ++ if (disableEarlyOpeningForTests || !shouldOpenEarly || interval < 0) + interval = Long.MAX_VALUE; + return interval; } public SSTableWriter currentWriter() http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java index dbbce9e,028cf6c..d4579af --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@@ -273,21 -234,6 +273,21 @@@ public class ScrubTes } @Test + public void testScrubCorruptedCounterRowNoEarlyOpen() throws IOException, WriteTimeoutException + { - long oldOpenVal = SSTableRewriter.getOpenInterval(); ++ boolean oldDisabledVal = SSTableRewriter.disableEarlyOpeningForTests; + try + { - SSTableRewriter.overrideOpenInterval(Long.MAX_VALUE); ++ SSTableRewriter.disableEarlyOpeningForTests = true; + testScrubCorruptedCounterRow(); + } + finally + { - SSTableRewriter.overrideOpenInterval(oldOpenVal); ++ SSTableRewriter.disableEarlyOpeningForTests = oldDisabledVal; + } + } + + @Test public void testScrubDeletedRow() throws ExecutionException, InterruptedException { CompactionManager.instance.disableAutoCompaction(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/180130a8/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 265bb6a,e1b001e..9e1cb91 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@@ -130,10 -105,8 +131,10 @@@ public class SSTableRewriterTest extend cfs.addSSTable(s); Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); assertEquals(1, sstables.size()); - SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) ++ + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); + LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); - SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);) ++ SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000);) { ISSTableScanner scanner = scanners.scanners.get(0); CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); @@@ -164,11 -136,9 +165,11 @@@ cfs.addSSTable(s); Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables()); assertEquals(1, sstables.size()); - SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000); ++ boolean checked = false; - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);) + try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); + LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); - SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);) ++ SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000)) { ISSTableScanner scanner = scanners.scanners.get(0); CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); @@@ -261,19 -230,15 +262,18 @@@ SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); - long startStorageMetricsLoad = StorageMetrics.load.count(); + long startStorageMetricsLoad = StorageMetrics.load.getCount(); + long sBytesOnDisk = s.bytesOnDisk(); Set<SSTableReader> compacting = Sets.newHashSet(s); - SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000); - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + List<SSTableReader> sstables; int files = 1; try (ISSTableScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);) ++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000)) { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while(scanner.hasNext()) { rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); @@@ -315,16 -280,13 +315,15 @@@ cfs.addSSTable(s); Set<SSTableReader> compacting = Sets.newHashSet(s); - SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000); - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + List<SSTableReader> sstables; int files = 1; try (ISSTableScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);) ++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000)) { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while(scanner.hasNext()) { rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); @@@ -437,22 -407,24 +436,21 @@@ DecoratedKey origFirst = s.first; DecoratedKey origLast = s.last; - long startSize = cfs.metric.liveDiskSpaceUsed.count(); + long startSize = cfs.metric.liveDiskSpaceUsed.getCount(); Set<SSTableReader> compacting = Sets.newHashSet(s); - SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000); - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); try (ISSTableScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);) ++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000);) { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); test.run(scanner, controller, s, cfs, rewriter); } - catch (Throwable t) - { - rewriter.abort(); - throw t; - } - Thread.sleep(1000); - assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count()); + SSTableDeletingTask.waitForDeletions(); + + assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.getCount()); assertEquals(1, cfs.getSSTables().size()); assertFileCounts(s.descriptor.directory.list(), 0, 0); assertEquals(cfs.getSSTables().iterator().next().first, origFirst); @@@ -471,15 -443,13 +469,14 @@@ cfs.addSSTable(s); Set<SSTableReader> compacting = Sets.newHashSet(s); - SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000); - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); int files = 1; try (ISSTableScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);) ++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000)) { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while(scanner.hasNext()) { rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); @@@ -516,16 -491,13 +513,15 @@@ SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); Set<SSTableReader> compacting = Sets.newHashSet(s); - SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 10000000); - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + List<SSTableReader> sstables; int files = 1; try (ISSTableScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);) ++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000)) { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while(scanner.hasNext()) { rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); @@@ -558,16 -535,13 +554,15 @@@ SSTableReader s = writeFile(cfs, 400); cfs.addSSTable(s); Set<SSTableReader> compacting = Sets.newHashSet(s); - SSTableRewriter.overrideOpenInterval(1000000); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 1000000); - rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); + List<SSTableReader> sstables; int files = 1; try (ISSTableScanner scanner = s.getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);) ++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 1000000);) { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while(scanner.hasNext()) { rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); @@@ -648,15 -619,13 +643,14 @@@ if (!offline) cfs.addSSTable(s); Set<SSTableReader> compacting = Sets.newHashSet(s); - SSTableRewriter.overrideOpenInterval(10000000); - cfs.getDataTracker().markCompacting(compacting); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline, 10000000); - SSTableWriter w = getWriter(cfs, s.descriptor.directory); - rewriter.switchWriter(w); try (ISSTableScanner scanner = compacting.iterator().next().getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = offline ? LifecycleTransaction.offline(OperationType.UNKNOWN, compacting) + : cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, offline); ++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, offline, 10000000); + ) { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while (scanner.hasNext()) { rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); @@@ -729,16 -695,15 +723,15 @@@ SSTableReader s = cfs.getSSTables().iterator().next(); Set<SSTableReader> compacting = new HashSet<>(); compacting.add(s); - cfs.getDataTracker().markCompacting(compacting); - SSTableRewriter.overrideOpenInterval(1); - SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false, 1); - SSTableWriter w = getWriter(cfs, s.descriptor.directory); - rewriter.switchWriter(w); int keyCount = 0; try (ISSTableScanner scanner = compacting.iterator().next().getScanner(); - CompactionController controller = new CompactionController(cfs, compacting, 0)) + CompactionController controller = new CompactionController(cfs, compacting, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN); - SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false); ++ SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 1); + ) { + rewriter.switchWriter(getWriter(cfs, s.descriptor.directory)); while (scanner.hasNext()) { rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next()))); @@@ -765,16 -738,14 +758,15 @@@ SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); - Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting()); + Set<SSTableReader> sstables = Sets.newHashSet(s); assertEquals(1, sstables.size()); - SSTableRewriter.overrideOpenInterval(10000000); - SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, 10000000); boolean checked = false; - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables)) + try (ISSTableScanner scanner = sstables.iterator().next().getScanner(); + CompactionController controller = new CompactionController(cfs, sstables, 0); + LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); - SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false); ++ SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, 10000000); + ) { - ISSTableScanner scanner = scanners.scanners.get(0); - CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); while (scanner.hasNext()) { @@@ -796,6 -770,55 +788,52 @@@ validateCFS(cfs); } + /** + * emulates anticompaction - writing from one source sstable to two new sstables + * + * @throws IOException + */ + @Test + public void testTwoWriters() throws IOException + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); + cfs.truncateBlocking(); + + SSTableReader s = writeFile(cfs, 1000); + cfs.addSSTable(s); - Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting()); ++ Set<SSTableReader> sstables = Sets.newHashSet(s); + assertEquals(1, sstables.size()); - SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false, false); - SSTableRewriter writer2 = new SSTableRewriter(cfs, sstables, 1000, false, false); - try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables)) ++ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables); ++ LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); ++ SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false, false); ++ SSTableRewriter writer2 = new SSTableRewriter(cfs, txn, 1000, false, false)) + { + ISSTableScanner scanner = scanners.scanners.get(0); + CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis())); + writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); + writer2.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory)); + while (scanner.hasNext()) + { + AbstractCompactedRow row = new LazilyCompactedRow(controller, Collections.singletonList(scanner.next())); + + if (writer.currentWriter().getFilePointer() < 15000000) + writer.append(row); + else + writer2.append(row); + } + for (int i = 0; i < 5000; i++) + { + DecoratedKey key = Util.dk(ByteBufferUtil.bytes(i)); + ColumnFamily cf = Util.getColumnFamily(keyspace, key, CF); + assertTrue(cf != null); + } + } - writer.abort(); - writer2.abort(); - cfs.getDataTracker().unmarkCompacting(sstables); - cfs.truncateBlocking(); - SSTableDeletingTask.waitForDeletions(); ++ truncateCF(); + validateCFS(cfs); + } + + private void validateKeys(Keyspace ks) { for (int i = 0; i < 100; i++)