Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 528500090 -> 605bcdcf1 refs/heads/trunk 4f51341b5 -> e8a5327bb
Fix split and scrub tool sstable cleanup Follow up to CASSANDRA-9978 patch by stefania; reviewed by benedict for CASSANDRA-7066 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/605bcdcf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/605bcdcf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/605bcdcf Branch: refs/heads/cassandra-3.0 Commit: 605bcdcf11f2238d6d3d95b6281c9e38cf56e533 Parents: 5285000 Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Wed Aug 5 15:32:55 2015 +0800 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Fri Aug 7 10:00:56 2015 +0200 ---------------------------------------------------------------------- .../compaction/AbstractCompactionStrategy.java | 2 +- .../cassandra/db/compaction/CompactionTask.java | 11 +++++-- .../DateTieredCompactionStrategy.java | 6 ++-- .../db/compaction/LeveledCompactionTask.java | 6 ++-- .../db/compaction/SSTableSplitter.java | 4 +-- .../cassandra/db/compaction/Scrubber.java | 32 ++++++++------------ .../SizeTieredCompactionStrategy.java | 12 ++++---- .../writers/CompactionAwareWriter.java | 12 ++++++-- .../writers/DefaultCompactionWriter.java | 9 ++++-- .../writers/MajorLeveledCompactionWriter.java | 17 +++++++++-- .../writers/MaxSSTableSizeWriter.java | 19 ++++++++++-- .../SplittingSizeTieredCompactionWriter.java | 2 +- .../cassandra/db/lifecycle/TransactionLogs.java | 2 +- .../cassandra/io/sstable/SSTableTxnWriter.java | 6 ++-- .../cassandra/tools/StandaloneSplitter.java | 7 +---- .../db/compaction/LongCompactionsTest.java | 2 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 7 +++-- .../compaction/CompactionAwareWriterTest.java | 6 ++-- .../io/sstable/BigTableWriterTest.java | 13 ++++++-- .../io/sstable/SSTableRewriterTest.java | 2 -- 20 files changed, 110 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 379d3de..4279f6e 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -180,7 +180,7 @@ public abstract class AbstractCompactionStrategy public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, final int gcBefore, long maxSSTableBytes) { - return new CompactionTask(cfs, txn, gcBefore, false); + return new CompactionTask(cfs, txn, gcBefore); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 7897a1a..0bd6aae 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -52,14 +52,21 @@ public class CompactionTask extends AbstractCompactionTask protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class); protected final int gcBefore; private final boolean offline; + private final boolean keepOriginals; protected static long totalBytesCompacted = 0; private CompactionExecutorStatsCollector collector; - public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline) + public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore) + { + this(cfs, txn, gcBefore, false, false); + } + + public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline, boolean keepOriginals) { super(cfs, txn); this.gcBefore = gcBefore; this.offline = offline; + this.keepOriginals = keepOriginals; } public static synchronized long addToTotalBytesCompacted(long bytesCompacted) @@ -224,7 +231,7 @@ public class CompactionTask extends AbstractCompactionTask LifecycleTransaction transaction, Set<SSTableReader> nonExpiredSSTables) { - return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline); + return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline, keepOriginals); } public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize) http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index 8fa3b8f..1d1faf5 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -71,7 +71,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION); if (modifier != null) - return new CompactionTask(cfs, modifier, gcBefore, false); + return new CompactionTask(cfs, modifier, gcBefore); } } @@ -372,7 +372,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy if (modifier == null) return null; - return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, modifier, gcBefore, false)); + return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, modifier, gcBefore)); } @Override @@ -388,7 +388,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy return null; } - return new CompactionTask(cfs, modifier, gcBefore, false).setUserDefined(true); + return new CompactionTask(cfs, modifier, gcBefore).setUserDefined(true); } public int getEstimatedRemainingTasks() http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java index d3d56ac..11d113d 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java @@ -34,7 +34,7 @@ public class LeveledCompactionTask extends CompactionTask public LeveledCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int level, int gcBefore, long maxSSTableBytes, boolean majorCompaction) { - super(cfs, txn, gcBefore, false); + super(cfs, txn, gcBefore); this.level = level; this.maxSSTableBytes = maxSSTableBytes; this.majorCompaction = majorCompaction; @@ -46,8 +46,8 @@ public class LeveledCompactionTask extends CompactionTask Set<SSTableReader> nonExpiredSSTables) { if (majorCompaction) - return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false); - return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false); + return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false, false); + return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, false); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java index 8f382ea..1944364 100644 --- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java +++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java @@ -60,7 +60,7 @@ public class SSTableSplitter { public SplittingCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction transaction, int sstableSizeInMB) { - super(cfs, transaction, CompactionManager.NO_GC, true); + super(cfs, transaction, CompactionManager.NO_GC, true, false); this.sstableSizeInMB = sstableSizeInMB; if (sstableSizeInMB <= 0) @@ -78,7 +78,7 @@ public class SSTableSplitter { LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) { - return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true); + return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, false); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 891fac8..f9e9e71 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -60,9 +60,6 @@ public class Scrubber implements Closeable private final boolean isOffline; - private SSTableReader newSstable; - private SSTableReader newInOrderSstable; - private int goodRows; private int badRows; private int emptyRows; @@ -152,9 +149,10 @@ public class Scrubber implements Closeable public void scrub() { + List<SSTableReader> finished = new ArrayList<>(); + boolean completed = false; outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length())); - int nowInSec = FBUtilities.nowInSeconds(); - try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline).keepOriginals(isOffline)) + try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline)) { nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile) : null; if (indexAvailable()) @@ -296,6 +294,7 @@ public class Scrubber implements Closeable { // out of order rows, but no bad rows found - we can keep our repairedAt time long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt; + SSTableReader newInOrderSstable; try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable, transaction)) { for (Partition partition : outOfOrder) @@ -303,20 +302,25 @@ public class Scrubber implements Closeable newInOrderSstable = inOrderWriter.finish(-1, sstable.maxDataAge, true); } transaction.update(newInOrderSstable, false); + finished.add(newInOrderSstable); outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrder.size(), sstable, newInOrderSstable)); } // finish obsoletes the old sstable - List<SSTableReader> finished = writer.setRepairedAt(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt).finish(); - if (!finished.isEmpty()) - newSstable = finished.get(0); + finished.addAll(writer.setRepairedAt(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt).finish()); + completed = true; } catch (IOException e) { throw Throwables.propagate(e); } + finally + { + if (transaction.isOffline()) + finished.forEach(sstable -> sstable.selfRef().release()); + } - if (newSstable == null) + if (completed) { if (badRows > 0) outputHandler.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot"); @@ -384,16 +388,6 @@ public class Scrubber implements Closeable outOfOrder.add(ArrayBackedPartition.create(iterator)); } - public SSTableReader getNewSSTable() - { - return newSstable; - } - - public SSTableReader getNewInOrderSSTable() - { - return newInOrderSstable; - } - private void throwIfFatal(Throwable th) { if (th instanceof Error && !(th instanceof AssertionError || th instanceof IOError)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 09d40c8..2353aa3 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -184,7 +184,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy LifecycleTransaction transaction = cfs.getTracker().tryModify(hottestBucket, OperationType.COMPACTION); if (transaction != null) - return new CompactionTask(cfs, transaction, gcBefore, false); + return new CompactionTask(cfs, transaction, gcBefore); } } @@ -198,8 +198,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy if (txn == null) return null; if (splitOutput) - return Arrays.<AbstractCompactionTask>asList(new SplittingCompactionTask(cfs, txn, gcBefore, false)); - return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, txn, gcBefore, false)); + return Arrays.<AbstractCompactionTask>asList(new SplittingCompactionTask(cfs, txn, gcBefore)); + return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, txn, gcBefore)); } @SuppressWarnings("resource") @@ -214,7 +214,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy return null; } - return new CompactionTask(cfs, transaction, gcBefore, false).setUserDefined(true); + return new CompactionTask(cfs, transaction, gcBefore).setUserDefined(true); } public int getEstimatedRemainingTasks() @@ -335,9 +335,9 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy private static class SplittingCompactionTask extends CompactionTask { - public SplittingCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline) + public SplittingCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore) { - super(cfs, txn, gcBefore, offline); + super(cfs, txn, gcBefore); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index f8c73d3..50e5a96 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@ -48,8 +48,16 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa public CompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, + Set<SSTableReader> nonExpiredSSTables) + { + this(cfs, txn, nonExpiredSSTables, false, false); + } + + public CompactionAwareWriter(ColumnFamilyStore cfs, + LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, - boolean offline) + boolean offline, + boolean keepOriginals) { this.cfs = cfs; this.nonExpiredSSTables = nonExpiredSSTables; @@ -57,7 +65,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables); this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables); this.txn = txn; - this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline).keepOriginals(offline); + this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline).keepOriginals(keepOriginals); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index 53dad55..eb55d20 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -40,10 +40,15 @@ public class DefaultCompactionWriter extends CompactionAwareWriter { protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class); + public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) + { + this(cfs, txn, nonExpiredSSTables, false, false); + } + @SuppressWarnings("resource") - public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline) + public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals) { - super(cfs, txn, nonExpiredSSTables, offline); + super(cfs, txn, nonExpiredSSTables, offline, keepOriginals); logger.debug("Expected bloom filter size : {}", estimatedTotalKeys); long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()); File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index a44ea7e..a826809 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -47,10 +47,23 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter private int sstablesWritten = 0; private final boolean skipAncestors; + public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, + LifecycleTransaction txn, + Set<SSTableReader> nonExpiredSSTables, + long maxSSTableSize) + { + this(cfs, txn, nonExpiredSSTables, maxSSTableSize, false, false); + } + @SuppressWarnings("resource") - public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline) + public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, + LifecycleTransaction txn, + Set<SSTableReader> nonExpiredSSTables, + long maxSSTableSize, + boolean offline, + boolean keepOriginals) { - super(cfs, txn, nonExpiredSSTables, offline); + super(cfs, txn, nonExpiredSSTables, offline, keepOriginals); this.maxSSTableSize = maxSSTableSize; this.allSSTables = txn.originals(); expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index 3942b1e..241af0d 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -39,10 +39,25 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter private final long estimatedSSTables; private final Set<SSTableReader> allSSTables; + public MaxSSTableSizeWriter(ColumnFamilyStore cfs, + LifecycleTransaction txn, + Set<SSTableReader> nonExpiredSSTables, + long maxSSTableSize, + int level) + { + this(cfs, txn, nonExpiredSSTables, maxSSTableSize, level, false, false); + } + @SuppressWarnings("resource") - public MaxSSTableSizeWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline) + public MaxSSTableSizeWriter(ColumnFamilyStore cfs, + LifecycleTransaction txn, + Set<SSTableReader> nonExpiredSSTables, + long maxSSTableSize, + int level, + boolean offline, + boolean keepOriginals) { - super(cfs, txn, nonExpiredSSTables, offline); + super(cfs, txn, nonExpiredSSTables, offline, keepOriginals); this.allSSTables = txn.originals(); this.level = level; this.maxSSTableSize = maxSSTableSize; http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 5d8670d..65924fa 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -59,7 +59,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter @SuppressWarnings("resource") public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable) { - super(cfs, txn, nonExpiredSSTables, false); + super(cfs, txn, nonExpiredSSTables, false, false); this.allSSTables = txn.originals(); totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()); double[] potentialRatios = new double[20]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java b/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java index ab6c72a..80e7831 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java +++ b/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java @@ -592,7 +592,7 @@ public class TransactionLogs extends Transactional.AbstractTransactional impleme return; } - if (tracker != null && !wasNew) + if (tracker != null && tracker.cfstore != null && !wasNew) tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk); // release the referent to the parent so that the all transaction files can be released http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java index 8514dcc..42bffb1 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java @@ -60,18 +60,18 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem protected Throwable doCommit(Throwable accumulate) { - return txn.commit(writer.commit(accumulate)); + return writer.commit(txn.commit(accumulate)); } protected Throwable doAbort(Throwable accumulate) { - return txn.abort(writer.abort(accumulate)); + return writer.abort(txn.abort(accumulate)); } protected void doPrepare() { - writer.prepareToCommit(); txn.prepareToCommit(); + writer.prepareToCommit(); } public SSTableReader finish(boolean openResult) http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/src/java/org/apache/cassandra/tools/StandaloneSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java index 5a0c43f..e53038d 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java +++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java @@ -153,18 +153,13 @@ public class StandaloneSplitter try (LifecycleTransaction transaction = LifecycleTransaction.offline(OperationType.UNKNOWN, sstable)) { new SSTableSplitter(cfs, transaction, options.sizeInMB).split(); - - // Remove the sstable (it's been copied by split and snapshotted) - transaction.obsoleteOriginals(); } catch (Exception e) { System.err.println(String.format("Error splitting %s: %s", sstable, e.getMessage())); if (options.debug) e.printStackTrace(System.err); - } - finally - { + sstable.selfRef().release(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java index 20faa98..7db978e 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java @@ -127,7 +127,7 @@ public class LongCompactionsTest try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.COMPACTION)) { assert txn != null : "Cannot markCompacting all sstables"; - new CompactionTask(store, txn, gcBefore, false).execute(null); + new CompactionTask(store, txn, gcBefore).execute(null); } System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms", this.getClass().getName(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index 07bd22a..cc4038d 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -43,6 +43,7 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.compaction.Scrubber; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.TransactionLogs; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.db.partitions.Partition; @@ -142,7 +143,7 @@ public class ScrubTest // with skipCorrupted == false, the scrub is expected to fail try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB); - Scrubber scrubber = new Scrubber(cfs, txn, false, false, true);) + Scrubber scrubber = new Scrubber(cfs, txn, false, false, true)) { scrubber.scrub(); fail("Expected a CorruptSSTableException to be thrown"); @@ -152,7 +153,7 @@ public class ScrubTest // with skipCorrupted == true, the corrupt rows will be skipped Scrubber.ScrubResult scrubResult; try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB); - Scrubber scrubber = new Scrubber(cfs, txn, true, false, true);) + Scrubber scrubber = new Scrubber(cfs, txn, true, false, true)) { scrubResult = scrubber.scrubWithResult(); } @@ -370,9 +371,9 @@ public class ScrubTest { scrubber.scrub(); } + TransactionLogs.waitForDeletions(); cfs.loadNewSSTables(); assertOrderedAll(cfs, 7); - sstable.selfRef().release(); } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java index 19c56e8..6a57327 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java @@ -78,7 +78,7 @@ public class CompactionAwareWriterTest extends CQLTester populate(rowCount); LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION); long beforeSize = txn.originals().iterator().next().onDiskLength(); - CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals(), false); + CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals()); int rows = compact(cfs, txn, writer); assertEquals(1, cfs.getLiveSSTables().size()); assertEquals(rowCount, rows); @@ -97,7 +97,7 @@ public class CompactionAwareWriterTest extends CQLTester LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION); long beforeSize = txn.originals().iterator().next().onDiskLength(); int sstableSize = (int)beforeSize/10; - CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0, false); + CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0); int rows = compact(cfs, txn, writer); assertEquals(10, cfs.getLiveSSTables().size()); assertEquals(rowCount, rows); @@ -150,7 +150,7 @@ public class CompactionAwareWriterTest extends CQLTester LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION); long beforeSize = txn.originals().iterator().next().onDiskLength(); int sstableSize = (int)beforeSize/targetSSTableCount; - CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize, false); + CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize); int rows = compact(cfs, txn, writer); assertEquals(targetSSTableCount, cfs.getLiveSSTables().size()); int [] levelCounts = new int[5]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java index 357298e..856ef7c 100644 --- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java @@ -62,17 +62,17 @@ public class BigTableWriterTest extends AbstractTransactionalTest final Descriptor descriptor; final SSTableTxnWriter writer; - private TestableBTW() throws IOException + private TestableBTW() { this(cfs.getSSTablePath(cfs.directories.getDirectoryForNewSSTables())); } - private TestableBTW(String file) throws IOException + private TestableBTW(String file) { this(file, SSTableTxnWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS))); } - private TestableBTW(String file, SSTableTxnWriter sw) throws IOException + private TestableBTW(String file, SSTableTxnWriter sw) { super(sw); this.file = new File(file); @@ -111,11 +111,18 @@ public class BigTableWriterTest extends AbstractTransactionalTest assertPrepared(); } + @Override + protected boolean commitCanThrow() + { + return true; + } + private void assertExists(Component ... components) { for (Component component : components) Assert.assertTrue(new File(descriptor.filenameFor(component)).exists()); } + private void assertNotExists(Component ... components) { for (Component component : components) http://git-wip-us.apache.org/repos/asf/cassandra/blob/605bcdcf/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index fd22941..f8b8fa7 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -670,8 +670,6 @@ public class SSTableRewriterTest extends SchemaLoader splitter.split(); assertFileCounts(s.descriptor.directory.list()); - - s.selfRef().release(); TransactionLogs.waitForDeletions(); for (File f : s.descriptor.directory.listFiles())