http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index d79b835..004e893 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -54,14 +54,10 @@ import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.OnDiskAtom; -import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionInfo.Holder; import org.apache.cassandra.db.index.SecondaryIndexBuilder; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -82,12 +78,14 @@ import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.Refs; +import static java.util.Collections.singleton; + /** * <p> * A singleton which manages a private executor of ongoing compactions. * </p> * Scheduling for compaction is accomplished by swapping sstables to be compacted into - * a set via DataTracker. New scheduling attempts will ignore currently compacting + * a set via Tracker. New scheduling attempts will ignore currently compacting * sstables. */ public class CompactionManager implements CompactionManagerMBean @@ -195,7 +193,7 @@ public class CompactionManager implements CompactionManagerMBean public boolean isCompacting(Iterable<ColumnFamilyStore> cfses) { for (ColumnFamilyStore cfs : cfses) - if (!cfs.getDataTracker().getCompacting().isEmpty()) + if (!cfs.getTracker().getCompacting().isEmpty()) return true; return false; } @@ -245,22 +243,22 @@ public class CompactionManager implements CompactionManagerMBean } } - private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation) throws ExecutionException, InterruptedException + private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, OperationType operationType) throws ExecutionException, InterruptedException { - Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting(); - if (compactingSSTables == null) - { - logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name); - return AllSSTableOpStatus.ABORTED; - } - if (Iterables.isEmpty(compactingSSTables)) + try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType);) { - logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name); - return AllSSTableOpStatus.SUCCESSFUL; - } - try - { - Iterable<SSTableReader> sstables = operation.filterSSTables(compactingSSTables); + if (compacting == null) + { + logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name); + return AllSSTableOpStatus.ABORTED; + } + if (compacting.originals().isEmpty()) + { + logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name); + return AllSSTableOpStatus.SUCCESSFUL; + } + + Iterable<SSTableReader> sstables = operation.filterSSTables(compacting.originals()); List<Future<Object>> futures = new ArrayList<>(); for (final SSTableReader sstable : sstables) @@ -271,31 +269,30 @@ public class CompactionManager implements CompactionManagerMBean return AllSSTableOpStatus.ABORTED; } + final LifecycleTransaction txn = compacting.split(singleton(sstable)); futures.add(executor.submit(new Callable<Object>() { @Override public Object call() throws Exception { - operation.execute(sstable); + operation.execute(txn); return this; } })); } + assert compacting.originals().isEmpty(); + for (Future<Object> f : futures) f.get(); + return AllSSTableOpStatus.SUCCESSFUL; } - finally - { - cfs.getDataTracker().unmarkCompacting(compactingSSTables); - } - return AllSSTableOpStatus.SUCCESSFUL; } private static interface OneSSTableOperation { Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input); - void execute(SSTableReader input) throws IOException; + void execute(LifecycleTransaction input) throws IOException; } public enum AllSSTableOpStatus { ABORTED(1), SUCCESSFUL(0); @@ -318,11 +315,11 @@ public class CompactionManager implements CompactionManagerMBean } @Override - public void execute(SSTableReader input) throws IOException + public void execute(LifecycleTransaction input) throws IOException { scrubOne(cfs, input, skipCorrupted, checkData); } - }); + }, OperationType.SCRUB); } public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, final boolean extendedVerify) throws InterruptedException, ExecutionException @@ -337,11 +334,11 @@ public class CompactionManager implements CompactionManagerMBean } @Override - public void execute(SSTableReader input) throws IOException + public void execute(LifecycleTransaction input) throws IOException { - verifyOne(cfs, input, extendedVerify); + verifyOne(cfs, input.onlyOne(), extendedVerify); } - }); + }, OperationType.VERIFY); } public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion) throws InterruptedException, ExecutionException @@ -362,14 +359,14 @@ public class CompactionManager implements CompactionManagerMBean } @Override - public void execute(SSTableReader input) throws IOException + public void execute(LifecycleTransaction txn) throws IOException { - AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(Collections.singleton(input), NO_GC, Long.MAX_VALUE); + AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(txn, NO_GC, Long.MAX_VALUE); task.setUserDefined(true); task.setCompactionType(OperationType.UPGRADE_SSTABLES); task.execute(metrics); } - }); + }, OperationType.UPGRADE_SSTABLES); } public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException @@ -395,12 +392,12 @@ public class CompactionManager implements CompactionManagerMBean } @Override - public void execute(SSTableReader input) throws IOException + public void execute(LifecycleTransaction txn) throws IOException { CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges); - doCleanupOne(cfStore, input, cleanupStrategy, ranges, hasIndexes); + doCleanupOne(cfStore, txn, cleanupStrategy, ranges, hasIndexes); } - }); + }, OperationType.CLEANUP); } public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs, @@ -412,19 +409,19 @@ public class CompactionManager implements CompactionManagerMBean @Override public void runMayThrow() throws Exception { - boolean success = false; - while (!success) + LifecycleTransaction modifier = null; + while (modifier == null) { - for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting()) + for (SSTableReader compactingSSTable : cfs.getTracker().getCompacting()) sstables.releaseIfHolds(compactingSSTable); Set<SSTableReader> compactedSSTables = new HashSet<>(); for (SSTableReader sstable : sstables) if (sstable.isMarkedCompacted()) compactedSSTables.add(sstable); sstables.release(compactedSSTables); - success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables); + modifier = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); } - performAnticompaction(cfs, ranges, sstables, repairedAt); + performAnticompaction(cfs, ranges, sstables, modifier, repairedAt); } }; if (executor.isShutdown()) @@ -452,6 +449,7 @@ public class CompactionManager implements CompactionManagerMBean public void performAnticompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Refs<SSTableReader> validatedForRepair, + LifecycleTransaction txn, long repairedAt) throws InterruptedException, IOException { logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size()); @@ -490,16 +488,18 @@ public class CompactionManager implements CompactionManagerMBean } } } - cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses); - cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses)); + cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses); + txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses)); validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses)); + assert txn.originals().equals(sstables); if (!sstables.isEmpty()) - doAntiCompaction(cfs, ranges, sstables, repairedAt); + doAntiCompaction(cfs, ranges, txn, repairedAt); + txn.finish(); } finally { validatedForRepair.release(); - cfs.getDataTracker().unmarkCompacting(sstables); + txn.close(); } logger.info("Completed anticompaction successfully"); @@ -657,9 +657,9 @@ public class CompactionManager implements CompactionManagerMBean } } - private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean checkData) throws IOException + private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData) throws IOException { - Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false, checkData); + Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, false, checkData); CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo(); metrics.beginCompaction(scrubInfo); @@ -750,15 +750,16 @@ public class CompactionManager implements CompactionManagerMBean * * @throws IOException */ - private void doCleanupOne(final ColumnFamilyStore cfs, SSTableReader sstable, CleanupStrategy cleanupStrategy, Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException + private void doCleanupOne(final ColumnFamilyStore cfs, LifecycleTransaction txn, CleanupStrategy cleanupStrategy, Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException { assert !cfs.isIndex(); - Set<SSTableReader> sstableSet = Collections.singleton(sstable); + SSTableReader sstable = txn.onlyOne(); if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges)) { - cfs.getDataTracker().markCompactedSSTablesReplaced(sstableSet, Collections.<SSTableReader>emptyList(), OperationType.CLEANUP); + txn.obsoleteOriginals(); + txn.finish(); return; } if (!needsCleanup(sstable, ranges)) @@ -772,13 +773,13 @@ public class CompactionManager implements CompactionManagerMBean long totalkeysWritten = 0; int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), - (int) (SSTableReader.getApproximateKeyCount(sstableSet))); + (int) (SSTableReader.getApproximateKeyCount(txn.originals()))); if (logger.isDebugEnabled()) logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize); logger.info("Cleaning up {}", sstable); - File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableSet, OperationType.CLEANUP)); + File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP)); if (compactionFileLocation == null) throw new IOException("disk full"); @@ -786,10 +787,9 @@ public class CompactionManager implements CompactionManagerMBean CleanupInfo ci = new CleanupInfo(sstable, scanner); metrics.beginCompaction(ci); - Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable); List<SSTableReader> finished; - try (SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false); - CompactionController controller = new CompactionController(cfs, sstableSet, getDefaultGcBefore(cfs))) + try (SSTableRewriter writer = new SSTableRewriter(cfs, txn, sstable.maxDataAge, false); + CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs))) { writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable)); @@ -811,7 +811,6 @@ public class CompactionManager implements CompactionManagerMBean cfs.indexManager.flushIndexesBlocking(); finished = writer.finish(); - cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.CLEANUP); } finally { @@ -970,11 +969,11 @@ public class CompactionManager implements CompactionManagerMBean } } return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)), - (long)expectedBloomFilterSize, - repairedAt, - cfs.metadata, - cfs.partitioner, - new MetadataCollector(sstables, cfs.metadata.comparator, minLevel)); + (long) expectedBloomFilterSize, + repairedAt, + cfs.metadata, + cfs.partitioner, + new MetadataCollector(sstables, cfs.metadata.comparator, minLevel)); } @@ -1057,7 +1056,7 @@ public class CompactionManager implements CompactionManagerMBean long numPartitions = 0; for (SSTableReader sstable : sstables) { - numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range)); + numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range)); } // determine tree depth from number of partitions, but cap at 20 to prevent large tree. int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0; @@ -1119,37 +1118,39 @@ public class CompactionManager implements CompactionManagerMBean * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted * and subsequently deleted. * @param cfs - * @param repairedSSTables + * @param repaired a transaction over the repaired sstables to anticompacy * @param ranges Repaired ranges to be placed into one of the new sstables. The repaired table will be tracked via * the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field. */ - private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, - Collection<SSTableReader> repairedSSTables, long repairedAt) + private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt) { - logger.info("Performing anticompaction on {} sstables", repairedSSTables.size()); + logger.info("Performing anticompaction on {} sstables", repaired.originals().size()); //Group SSTables - Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repairedSSTables); + Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals()); // iterate over sstables to check if the repaired / unrepaired ranges intersect them. int antiCompactedSSTableCount = 0; for (Collection<SSTableReader> sstableGroup : groupedSSTables) { - int antiCompacted = antiCompactGroup(cfs, ranges, sstableGroup, repairedAt); - antiCompactedSSTableCount += antiCompacted; + try (LifecycleTransaction txn = repaired.split(sstableGroup)) + { + int antiCompacted = antiCompactGroup(cfs, ranges, txn, repairedAt); + antiCompactedSSTableCount += antiCompacted; + } } String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s)."; - logger.info(format, repairedSSTables.size(), antiCompactedSSTableCount); + logger.info(format, repaired.originals().size(), antiCompactedSSTableCount); } private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, - Collection<SSTableReader> anticompactionGroup, long repairedAt) + LifecycleTransaction anticompactionGroup, long repairedAt) { long groupMaxDataAge = -1; // check that compaction hasn't stolen any sstables used in previous repair sessions // if we need to skip the anticompaction, it will be carried out by the next repair - for (Iterator<SSTableReader> i = anticompactionGroup.iterator(); i.hasNext();) + for (Iterator<SSTableReader> i = anticompactionGroup.originals().iterator(); i.hasNext();) { SSTableReader sstable = i.next(); if (!new File(sstable.getFilename()).exists()) @@ -1162,26 +1163,25 @@ public class CompactionManager implements CompactionManagerMBean groupMaxDataAge = sstable.maxDataAge; } - - if (anticompactionGroup.size() == 0) + if (anticompactionGroup.originals().size() == 0) { logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available"); return 0; } logger.info("Anticompacting {}", anticompactionGroup); - Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup); + Set<SSTableReader> sstableAsSet = anticompactionGroup.originals(); File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); long repairedKeyCount = 0; long unrepairedKeyCount = 0; AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); - try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false); - SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false); - AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup); + try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false); + SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false); + AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals()); CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs))) { - int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup))); + int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet))); repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet)); unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet)); @@ -1212,12 +1212,18 @@ public class CompactionManager implements CompactionManagerMBean { metrics.finishCompaction(ci); } - // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them - // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness + List<SSTableReader> anticompactedSSTables = new ArrayList<>(); - anticompactedSSTables.addAll(repairedSSTableWriter.setRepairedAt(repairedAt).finish()); - anticompactedSSTables.addAll(unRepairedSSTableWriter.setRepairedAt(ActiveRepairService.UNREPAIRED_SSTABLE).finish()); - cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION); + // since both writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method, + // as on the second finish() we would prepareToCommit() on a Transaction that has already been committed, which is forbidden by the API + // (since it indicates misuse). We call permitRedundantTransitions so that calls that transition to a state already occupied are permitted. + anticompactionGroup.permitRedundantTransitions(); + repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit(); + unRepairedSSTableWriter.prepareToCommit(); + anticompactedSSTables.addAll(repairedSSTableWriter.finished()); + anticompactedSSTables.addAll(unRepairedSSTableWriter.finished()); + repairedSSTableWriter.commit(); + unRepairedSSTableWriter.commit(); logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount, repairedKeyCount + unrepairedKeyCount,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 34f57c1..e593ec0 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.db.compaction; -import java.io.File; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -44,6 +42,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.UUIDGen; @@ -55,9 +54,9 @@ public class CompactionTask extends AbstractCompactionTask protected static long totalBytesCompacted = 0; private CompactionExecutorStatsCollector collector; - public CompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int gcBefore, boolean offline) + public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline) { - super(cfs, Sets.newHashSet(sstables)); + super(cfs, txn); this.gcBefore = gcBefore; this.offline = offline; } @@ -71,23 +70,20 @@ public class CompactionTask extends AbstractCompactionTask { this.collector = collector; run(); - return sstables.size(); + return transaction.originals().size(); } public boolean reduceScopeForLimitedSpace() { - if (partialCompactionsAcceptable() && sstables.size() > 1) + if (partialCompactionsAcceptable() && transaction.originals().size() > 1) { // Try again w/o the largest one. - logger.warn("insufficient space to compact all requested files {}", StringUtils.join(sstables, ", ")); + logger.warn("insufficient space to compact all requested files {}", StringUtils.join(transaction.originals(), ", ")); // Note that we have removed files that are still marked as compacting. // This suboptimal but ok since the caller will unmark all the sstables at the end. - SSTableReader removedSSTable = cfs.getMaxSizeFile(sstables); - if (sstables.remove(removedSSTable)) - { - cfs.getDataTracker().unmarkCompacting(Arrays.asList(removedSSTable)); - return true; - } + SSTableReader removedSSTable = cfs.getMaxSizeFile(transaction.originals()); + transaction.cancel(removedSSTable); + return true; } return false; } @@ -101,9 +97,9 @@ public class CompactionTask extends AbstractCompactionTask { // The collection of sstables passed may be empty (but not null); even if // it is not empty, it may compact down to nothing if all rows are deleted. - assert sstables != null; + assert transaction != null; - if (sstables.size() == 0) + if (transaction.originals().isEmpty()) return; // Note that the current compaction strategy, is not necessarily the one this task was created under. @@ -115,12 +111,12 @@ public class CompactionTask extends AbstractCompactionTask // note that we need to do a rough estimate early if we can fit the compaction on disk - this is pessimistic, but // since we might remove sstables from the compaction in checkAvailableDiskSpace it needs to be done here - long expectedWriteSize = cfs.getExpectedCompactedFileSize(sstables, compactionType); + long expectedWriteSize = cfs.getExpectedCompactedFileSize(transaction.originals(), compactionType); long earlySSTableEstimate = Math.max(1, expectedWriteSize / strategy.getMaxSSTableBytes()); checkAvailableDiskSpace(earlySSTableEstimate, expectedWriteSize); // sanity check: all sstables must belong to the same cfs - assert !Iterables.any(sstables, new Predicate<SSTableReader>() + assert !Iterables.any(transaction.originals(), new Predicate<SSTableReader>() { @Override public boolean apply(SSTableReader sstable) @@ -129,13 +125,13 @@ public class CompactionTask extends AbstractCompactionTask } }); - UUID taskId = SystemKeyspace.startCompaction(cfs, sstables); + UUID taskId = SystemKeyspace.startCompaction(cfs, transaction.originals()); // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting // all the sstables (that existed when we started) StringBuilder ssTableLoggerMsg = new StringBuilder("["); - for (SSTableReader sstr : sstables) + for (SSTableReader sstr : transaction.originals()) { ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel())); } @@ -148,11 +144,11 @@ public class CompactionTask extends AbstractCompactionTask long totalKeysWritten = 0; long estimatedKeys = 0; - try (CompactionController controller = getCompactionController(sstables)) + try (CompactionController controller = getCompactionController(transaction.originals())) { - Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables()); + Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); - SSTableFormat.Type sstableFormat = getFormatType(sstables); + SSTableFormat.Type sstableFormat = getFormatType(transaction.originals()); List<SSTableReader> newSStables; AbstractCompactionIterable ci; @@ -171,7 +167,7 @@ public class CompactionTask extends AbstractCompactionTask if (!controller.cfs.getCompactionStrategy().isActive) throw new CompactionInterruptedException(ci.getCompactionInfo()); - try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, sstables, actuallyCompact)) + try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) { estimatedKeys = writer.estimatedKeys(); while (iter.hasNext()) @@ -205,13 +201,9 @@ public class CompactionTask extends AbstractCompactionTask } } - Collection<SSTableReader> oldSStables = this.sstables; - if (!offline) - cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType); - // log a bunch of statistics about the result and save to system table compaction_history long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTableReader.getTotalBytes(oldSStables); + long startsize = SSTableReader.getTotalBytes(transaction.originals()); long endsize = SSTableReader.getTotalBytes(newSStables); double ratio = (double) endsize / (double) startsize; @@ -223,7 +215,7 @@ public class CompactionTask extends AbstractCompactionTask long totalSourceRows = 0; String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize); logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - taskIdLoggerMsg, oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); + taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten)); @@ -236,9 +228,9 @@ public class CompactionTask extends AbstractCompactionTask } @Override - public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables) + public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, Set<SSTableReader> nonExpiredSSTables) { - return new DefaultCompactionWriter(cfs, allSSTables, nonExpiredSSTables, offline, compactionType); + return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline, compactionType); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index 6385671..18d5f7b 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.cql3.statements.CFPropDefs; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.Pair; @@ -67,8 +68,9 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy if (latestBucket.isEmpty()) return null; - if (cfs.getDataTracker().markCompacting(latestBucket)) - return new CompactionTask(cfs, latestBucket, gcBefore, false); + LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION); + if (modifier != null) + return new CompactionTask(cfs, modifier, gcBefore, false); } } @@ -366,11 +368,11 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy @Override public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput) { - Iterable<SSTableReader> sstables = cfs.markAllCompacting(); - if (sstables == null) + LifecycleTransaction modifier = cfs.markAllCompacting(OperationType.COMPACTION); + if (modifier == null) return null; - return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, sstables, gcBefore, false)); + return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, modifier, gcBefore, false)); } @Override @@ -378,13 +380,14 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy { assert !sstables.isEmpty(); // checked for by CM.submitUserDefined - if (!cfs.getDataTracker().markCompacting(sstables)) + LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION); + if (modifier == null) { logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables); return null; } - return new CompactionTask(cfs, sstables, gcBefore, false).setUserDefined(true); + return new CompactionTask(cfs, modifier, gcBefore, false).setUserDefined(true); } public int getEstimatedRemainingTasks() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 6b82ad3..c434d31 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; @@ -115,9 +116,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy op = OperationType.COMPACTION; } - if (cfs.getDataTracker().markCompacting(candidate.sstables)) + LifecycleTransaction txn = cfs.getTracker().tryModify(candidate.sstables, OperationType.COMPACTION); + if (txn != null) { - LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, candidate.sstables, candidate.level, gcBefore, candidate.maxSSTableBytes, false); + LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, txn, candidate.level, gcBefore, candidate.maxSSTableBytes, false); newTask.setCompactionType(op); return newTask; } @@ -131,9 +133,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables); if (Iterables.isEmpty(sstables)) return null; - if (!cfs.getDataTracker().markCompacting(filteredSSTables)) + LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION); + if (txn == null) return null; - return Arrays.<AbstractCompactionTask>asList(new LeveledCompactionTask(cfs, filteredSSTables, 0, gcBefore, getMaxSSTableBytes(), true)); + return Arrays.<AbstractCompactionTask>asList(new LeveledCompactionTask(cfs, txn, 0, gcBefore, getMaxSSTableBytes(), true)); } @@ -144,19 +147,19 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy } @Override - public AbstractCompactionTask getCompactionTask(Collection<SSTableReader> sstables, int gcBefore, long maxSSTableBytes) + public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes) { - assert sstables.size() > 0; + assert txn.originals().size() > 0; int level = -1; // if all sstables are in the same level, we can set that level: - for (SSTableReader sstable : sstables) + for (SSTableReader sstable : txn.originals()) { if (level == -1) level = sstable.getSSTableLevel(); if (level != sstable.getSSTableLevel()) level = 0; } - return new LeveledCompactionTask(cfs, sstables, level, gcBefore, maxSSTableBytes, false); + return new LeveledCompactionTask(cfs, txn, level, gcBefore, maxSSTableBytes, false); } /** @@ -226,7 +229,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy { for (Integer level : byLevel.keySet()) { - // level can be -1 when sstables are added to DataTracker but not to LeveledManifest + // level can be -1 when sstables are added to Tracker but not to LeveledManifest // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables. if (level <= 0) { @@ -402,7 +405,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy if (sstables.isEmpty()) continue; - Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting(); + Set<SSTableReader> compacting = cfs.getTracker().getCompacting(); for (SSTableReader sstable : sstables) { if (sstable.getEstimatedDroppableTombstoneRatio(gcBefore) <= tombstoneThreshold) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java index ce9dfaf..1c3b686 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java @@ -24,6 +24,7 @@ import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter; import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; public class LeveledCompactionTask extends CompactionTask { @@ -31,20 +32,20 @@ public class LeveledCompactionTask extends CompactionTask private final long maxSSTableBytes; private final boolean majorCompaction; - public LeveledCompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int level, final int gcBefore, long maxSSTableBytes, boolean majorCompaction) + public LeveledCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int level, int gcBefore, long maxSSTableBytes, boolean majorCompaction) { - super(cfs, sstables, gcBefore, false); + super(cfs, txn, gcBefore, false); this.level = level; this.maxSSTableBytes = maxSSTableBytes; this.majorCompaction = majorCompaction; } @Override - public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables) + public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) { if (majorCompaction) - return new MajorLeveledCompactionWriter(cfs, sstables, nonExpiredSSTables, maxSSTableBytes, false, compactionType); - return new MaxSSTableSizeWriter(cfs, sstables, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, compactionType); + return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false, compactionType); + return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, compactionType); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index daff131..0d0928f 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -23,7 +23,6 @@ import java.util.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -316,7 +315,7 @@ public class LeveledManifest continue; // mostly this just avoids polluting the debug log with zero scores // we want to calculate score excluding compacting ones Set<SSTableReader> sstablesInLevel = Sets.newHashSet(sstables); - Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getDataTracker().getCompacting()); + Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getTracker().getCompacting()); double score = (double) SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i, maxSSTableSizeInBytes); logger.debug("Compaction score for level {} is {}", i, score); @@ -361,7 +360,7 @@ public class LeveledManifest private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables) { - Iterable<SSTableReader> candidates = cfs.getDataTracker().getUncompactingSSTables(sstables); + Iterable<SSTableReader> candidates = cfs.getTracker().getUncompacting(sstables); List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(candidates)); List<List<SSTableReader>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs, options.bucketHigh, @@ -415,7 +414,7 @@ public class LeveledManifest } if (min == null || max == null || min.equals(max)) // single partition sstables - we cannot include a high level sstable. return candidates; - Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting(); + Set<SSTableReader> compacting = cfs.getTracker().getCompacting(); Range<RowPosition> boundaries = new Range<>(min, max); for (SSTableReader sstable : getLevel(i)) { @@ -542,7 +541,7 @@ public class LeveledManifest assert !getLevel(level).isEmpty(); logger.debug("Choosing candidates for L{}", level); - final Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting(); + final Set<SSTableReader> compacting = cfs.getTracker().getCompacting(); if (level == 0) { @@ -650,7 +649,7 @@ public class LeveledManifest { Set<SSTableReader> sstables = new HashSet<>(); Set<SSTableReader> levelSSTables = new HashSet<>(getLevel(level)); - for (SSTableReader sstable : cfs.getDataTracker().getCompacting()) + for (SSTableReader sstable : cfs.getTracker().getCompacting()) { if (levelSSTables.contains(sstable)) sstables.add(sstable); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java index 8d7b0e9..e9a4f05 100644 --- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java +++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java @@ -23,6 +23,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; public class SSTableSplitter { @@ -30,9 +31,9 @@ public class SSTableSplitter { private CompactionInfo.Holder info; - public SSTableSplitter(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB) + public SSTableSplitter(ColumnFamilyStore cfs, LifecycleTransaction transaction, int sstableSizeInMB) { - this.task = new SplittingCompactionTask(cfs, sstable, sstableSizeInMB); + this.task = new SplittingCompactionTask(cfs, transaction, sstableSizeInMB); } public void split() @@ -57,9 +58,9 @@ public class SSTableSplitter { { private final int sstableSizeInMB; - public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB) + public SplittingCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction transaction, int sstableSizeInMB) { - super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC, true); + super(cfs, transaction, CompactionManager.NO_GC, true); this.sstableSizeInMB = sstableSizeInMB; if (sstableSizeInMB <= 0) @@ -73,9 +74,9 @@ public class SSTableSplitter { } @Override - public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables) + public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) { - return new MaxSSTableSizeWriter(cfs, sstables, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, compactionType); + return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, compactionType); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 1e014ed..b7c149c 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -23,9 +23,9 @@ import java.util.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; -import com.google.common.collect.Sets; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; @@ -41,6 +41,7 @@ public class Scrubber implements Closeable { private final ColumnFamilyStore cfs; private final SSTableReader sstable; + private final LifecycleTransaction transaction; private final File destination; private final boolean skipCorrupted; @@ -80,15 +81,16 @@ public class Scrubber implements Closeable }; private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator); - public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException + public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException { - this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData); + this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData); } - public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException + public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException { this.cfs = cfs; - this.sstable = sstable; + this.transaction = transaction; + this.sstable = transaction.onlyOne(); this.outputHandler = outputHandler; this.skipCorrupted = skipCorrupted; this.isOffline = isOffline; @@ -127,9 +129,7 @@ public class Scrubber implements Closeable public void scrub() { outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length())); - Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable); - - try (SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline);) + try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline);) { nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); { @@ -278,8 +278,7 @@ public class Scrubber implements Closeable inOrderWriter.append(row.key, row.cf); newInOrderSstable = inOrderWriter.finish(-1, sstable.maxDataAge, true); } - if (!isOffline) - cfs.getDataTracker().addSSTables(Collections.singleton(newInOrderSstable)); + transaction.update(newInOrderSstable, false); outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable)); } @@ -287,8 +286,6 @@ public class Scrubber implements Closeable List<SSTableReader> finished = writer.setRepairedAt(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt).finish(); if (!finished.isEmpty()) newSstable = finished.get(0); - if (!isOffline) - cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.SCRUB); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 722536c..94c3daf 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -21,22 +21,19 @@ import java.util.*; import java.util.Map.Entry; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; -import com.google.common.primitives.Longs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.cql3.statements.CFPropDefs; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.ColumnNameHelper; import org.apache.cassandra.utils.Pair; public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy @@ -190,8 +187,9 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy if (hottestBucket.isEmpty()) return null; - if (cfs.getDataTracker().markCompacting(hottestBucket)) - return new CompactionTask(cfs, hottestBucket, gcBefore, false); + LifecycleTransaction transaction = cfs.getTracker().tryModify(hottestBucket, OperationType.COMPACTION); + if (transaction != null) + return new CompactionTask(cfs, transaction, gcBefore, false); } } @@ -200,24 +198,26 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables); if (Iterables.isEmpty(filteredSSTables)) return null; - if (!cfs.getDataTracker().markCompacting(ImmutableList.copyOf(filteredSSTables))) + LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION); + if (txn == null) return null; if (splitOutput) - return Arrays.<AbstractCompactionTask>asList(new SplittingCompactionTask(cfs, filteredSSTables, gcBefore, false)); - return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, filteredSSTables, gcBefore, false)); + return Arrays.<AbstractCompactionTask>asList(new SplittingCompactionTask(cfs, txn, gcBefore, false)); + return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, txn, gcBefore, false)); } public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore) { assert !sstables.isEmpty(); // checked for by CM.submitUserDefined - if (!cfs.getDataTracker().markCompacting(sstables)) + LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION); + if (transaction == null) { logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables); return null; } - return new CompactionTask(cfs, sstables, gcBefore, false).setUserDefined(true); + return new CompactionTask(cfs, transaction, gcBefore, false).setUserDefined(true); } public int getEstimatedRemainingTasks() @@ -338,15 +338,15 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy private static class SplittingCompactionTask extends CompactionTask { - public SplittingCompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int gcBefore, boolean offline) + public SplittingCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline) { - super(cfs, sstables, gcBefore, offline); + super(cfs, txn, gcBefore, offline); } @Override - public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables) + public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables) { - return new SplittingSizeTieredCompactionWriter(cfs, allSSTables, nonExpiredSSTables, compactionType); + return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables, compactionType); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index 5bb1530..6556a71 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -20,12 +20,10 @@ package org.apache.cassandra.db.compaction; import java.io.File; import java.util.*; -import com.google.common.base.Throwables; -import com.google.common.collect.Sets; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; @@ -37,6 +35,7 @@ public class Upgrader { private final ColumnFamilyStore cfs; private final SSTableReader sstable; + private final LifecycleTransaction transaction; private final File directory; private final OperationType compactionType = OperationType.UPGRADE_SSTABLES; @@ -46,10 +45,11 @@ public class Upgrader private final OutputHandler outputHandler; - public Upgrader(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outputHandler) + public Upgrader(ColumnFamilyStore cfs, LifecycleTransaction txn, OutputHandler outputHandler) { this.cfs = cfs; - this.sstable = sstable; + this.transaction = txn; + this.sstable = txn.onlyOne(); this.outputHandler = outputHandler; this.directory = new File(sstable.getFilename()).getParentFile(); @@ -81,10 +81,9 @@ public class Upgrader public void upgrade() { outputHandler.output("Upgrading " + sstable); - Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable); - try (SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true); - AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade)) + try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true); + AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(transaction.originals())) { Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator(); writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java index 5345d8d..c511bcd 100644 --- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java @@ -50,7 +50,7 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy { super(cfs, cfs.metadata.compactionStrategyOptions); reloadCompactionStrategy(cfs.metadata); - cfs.getDataTracker().subscribe(this); + cfs.getTracker().subscribe(this); logger.debug("{} subscribed to the data tracker.", this); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index fe43186..20c96d6 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@ -25,6 +25,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.db.compaction.CompactionTask; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.SSTableRewriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.concurrent.Transactional; @@ -43,14 +44,14 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa protected final long minRepairedAt; protected final SSTableRewriter sstableWriter; - public CompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, boolean offline) + public CompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline) { this.cfs = cfs; this.nonExpiredSSTables = nonExpiredSSTables; this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables); this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables); this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables); - this.sstableWriter = new SSTableRewriter(cfs, allSSTables, maxAge, offline); + this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline); } /** @@ -67,12 +68,6 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa } @Override - protected Throwable doCleanup(Throwable accumulate) - { - return accumulate; - } - - @Override protected Throwable doCommit(Throwable accumulate) { return sstableWriter.commit(accumulate); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index 3589b54..0b31061 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction.writers; import java.io.File; -import java.util.List; import java.util.Set; import org.slf4j.Logger; @@ -28,14 +27,12 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableRewriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import static org.apache.cassandra.utils.Throwables.maybeFail; - /** * The default compaction writer - creates one output file in L0 @@ -44,9 +41,9 @@ public class DefaultCompactionWriter extends CompactionAwareWriter { protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class); - public DefaultCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType) + public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType) { - super(cfs, allSSTables, nonExpiredSSTables, offline); + super(cfs, txn, nonExpiredSSTables, offline); logger.debug("Expected bloom filter size : {}", estimatedTotalKeys); long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType); File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize)); @@ -55,7 +52,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter minRepairedAt, cfs.metadata, cfs.partitioner, - new MetadataCollector(allSSTables, cfs.metadata.comparator, 0)); + new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0)); sstableWriter.switchWriter(writer); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index d48140e..014b4af 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -18,8 +18,6 @@ package org.apache.cassandra.db.compaction.writers; import java.io.File; -import java.util.Collections; -import java.util.List; import java.util.Set; import org.slf4j.Logger; @@ -28,11 +26,10 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.compaction.AbstractCompactedRow; -import org.apache.cassandra.db.compaction.CompactionTask; import org.apache.cassandra.db.compaction.LeveledManifest; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableRewriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; @@ -50,11 +47,11 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter private int sstablesWritten = 0; private final boolean skipAncestors; - public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType) + public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType) { - super(cfs, allSSTables, nonExpiredSSTables, offline); + super(cfs, txn, nonExpiredSSTables, offline); this.maxSSTableSize = maxSSTableSize; - this.allSSTables = allSSTables; + this.allSSTables = txn.originals(); expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType)); long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize); long keysPerSSTable = estimatedTotalKeys / estimatedSSTables; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index ab24bf8..8903ff7 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -18,16 +18,14 @@ package org.apache.cassandra.db.compaction.writers; import java.io.File; -import java.util.List; import java.util.Set; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.compaction.AbstractCompactedRow; -import org.apache.cassandra.db.compaction.CompactionTask; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableRewriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; @@ -41,10 +39,10 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter private final long estimatedSSTables; private final Set<SSTableReader> allSSTables; - public MaxSSTableSizeWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType) + public MaxSSTableSizeWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType) { - super(cfs, allSSTables, nonExpiredSSTables, offline); - this.allSSTables = allSSTables; + super(cfs, txn, nonExpiredSSTables, offline); + this.allSSTables = txn.originals(); this.level = level; this.maxSSTableSize = maxSSTableSize; long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 2a452c7..81ea6b1 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction.writers; import java.io.File; import java.util.Arrays; -import java.util.List; import java.util.Set; import org.slf4j.Logger; @@ -28,10 +27,9 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.compaction.AbstractCompactedRow; -import org.apache.cassandra.db.compaction.CompactionTask; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableRewriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; @@ -53,15 +51,15 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter private long currentBytesToWrite; private int currentRatioIndex = 0; - public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType) + public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType) { - this(cfs, allSSTables, nonExpiredSSTables, compactionType, DEFAULT_SMALLEST_SSTABLE_BYTES); + this(cfs, txn, nonExpiredSSTables, compactionType, DEFAULT_SMALLEST_SSTABLE_BYTES); } - public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType, long smallestSSTable) + public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType, long smallestSSTable) { - super(cfs, allSSTables, nonExpiredSSTables, false); - this.allSSTables = allSSTables; + super(cfs, txn, nonExpiredSSTables, false); + this.allSSTables = txn.originals(); totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType); double[] potentialRatios = new double[20]; double currentRatio = 1; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java index f8b3aba..ba48350 100644 --- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java @@ -57,7 +57,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec indexedCfMetadata.cfName, new LocalPartitioner(getIndexKeyComparator()), indexedCfMetadata, - baseCfs.getDataTracker().loadsstables); + baseCfs.getTracker().loadsstables); } protected AbstractType<?> getIndexKeyComparator() @@ -143,7 +143,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec { Future<?> wait; // we synchronise on the baseCfs to make sure we are ordered correctly with other flushes to the base CFS - synchronized (baseCfs.getDataTracker()) + synchronized (baseCfs.getTracker()) { wait = indexCfs.forceFlush(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index dda532d..4c1bf45 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -346,7 +346,7 @@ public class SecondaryIndexManager { // despatch flushes for all CFS backed indexes List<Future<?>> wait = new ArrayList<>(); - synchronized (baseCfs.getDataTracker()) + synchronized (baseCfs.getTracker()) { for (SecondaryIndex index : allIndexes) if (index.getIndexCfs() != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/Helpers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java new file mode 100644 index 0000000..05f7531 --- /dev/null +++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.lifecycle; + +import java.util.*; + +import com.google.common.base.Predicate; +import com.google.common.collect.*; + +import org.apache.cassandra.io.sstable.format.SSTableReader; + +import static com.google.common.base.Predicates.*; +import static com.google.common.collect.Iterables.any; +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Iterables.filter; +import static com.google.common.collect.Iterables.getFirst; +import static org.apache.cassandra.utils.Throwables.merge; + +class Helpers +{ + /** + * update the contents of a set with the provided sets, ensuring that the items to remove are + * really present, and that the items to add are not (unless we're also removing them) + * @return a new set with the contents of the provided one modified + */ + static <T> Set<T> replace(Set<T> original, Set<T> remove, Iterable<T> add) + { + return ImmutableSet.copyOf(replace(identityMap(original), remove, add).keySet()); + } + + /** + * update the contents of an "identity map" with the provided sets, ensuring that the items to remove are + * really present, and that the items to add are not (unless we're also removing them) + * @return a new identity map with the contents of the provided one modified + */ + static <T> Map<T, T> replace(Map<T, T> original, Set<T> remove, Iterable<T> add) + { + // ensure the ones being removed are the exact same ones present + for (T reader : remove) + assert original.get(reader) == reader; + + // ensure we don't already contain any we're adding, that we aren't also removing + assert !any(add, and(not(in(remove)), in(original.keySet()))) : String.format("original:%s remove:%s add:%s", original.keySet(), remove, add); + + Map<T, T> result = + identityMap(concat(add, filter(original.keySet(), not(in(remove))))); + + assert result.size() == original.size() - remove.size() + Iterables.size(add) : + String.format("Expecting new size of %d, got %d while replacing %s by %s in %s", + original.size() - remove.size() + Iterables.size(add), result.size(), remove, add, original.keySet()); + return result; + } + + /** + * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety + * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise + */ + static Throwable setupDeleteNotification(Iterable<SSTableReader> readers, Tracker tracker, Throwable accumulate) + { + try + { + for (SSTableReader reader : readers) + reader.setupDeleteNotification(tracker); + } + catch (Throwable t) + { + // shouldn't be possible, but in case the contract changes in future and we miss it... + accumulate = merge(accumulate, t); + } + return accumulate; + } + + /** + * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety + * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise + */ + static Throwable setReplaced(Iterable<SSTableReader> readers, Throwable accumulate) + { + for (SSTableReader reader : readers) + { + try + { + reader.setReplaced(); + } + catch (Throwable t) + { + accumulate = merge(accumulate, t); + } + } + return accumulate; + } + + /** + * assert that none of these readers have been replaced + */ + static void checkNotReplaced(Iterable<SSTableReader> readers) + { + for (SSTableReader reader : readers) + assert !reader.isReplaced(); + } + + /** + * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety + * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise + */ + static Throwable markObsolete(Iterable<SSTableReader> readers, Throwable accumulate) + { + for (SSTableReader reader : readers) + { + try + { + boolean firstToCompact = reader.markObsolete(); + assert firstToCompact : reader + " was already marked compacted"; + } + catch (Throwable t) + { + accumulate = merge(accumulate, t); + } + } + return accumulate; + } + + /** + * @return the identity function, as a Map, with domain of the provided values + */ + static <T> Map<T, T> identityMap(Iterable<T> values) + { + ImmutableMap.Builder<T, T> builder = ImmutableMap.<T, T>builder(); + for (T t : values) + builder.put(t, t); + return builder.build(); + } + + /** + * @return an Iterable of the union if the sets, with duplicates being represented by their first encountered instance + * (as defined by the order of set provision) + */ + static <T> Iterable<T> concatUniq(Set<T>... sets) + { + List<Predicate<T>> notIn = new ArrayList<>(sets.length); + for (Set<T> set : sets) + notIn.add(not(in(set))); + List<Iterable<T>> results = new ArrayList<>(sets.length); + for (int i = 0 ; i < sets.length ; i++) + results.add(filter(sets[i], and(notIn.subList(0, i)))); + return concat(results); + } + + /** + * @return a Predicate yielding true for an item present in NONE of the provided sets + */ + static <T> Predicate<T> notIn(Set<T>... sets) + { + return not(orIn(sets)); + } + + /** + * @return a Predicate yielding true for an item present in ANY of the provided sets + */ + static <T> Predicate<T> orIn(Collection<T>... sets) + { + Predicate<T>[] orIn = new Predicate[sets.length]; + for (int i = 0 ; i < orIn.length ; i++) + orIn[i] = in(sets[i]); + return or(orIn); + } + + /** + * filter out (i.e. remove) matching elements + * @return filter, filtered to only those elements that *are not* present in *any* of the provided sets (are present in none) + */ + static <T> Iterable<T> filterOut(Iterable<T> filter, Set<T>... inNone) + { + return filter(filter, notIn(inNone)); + } + + /** + * filter in (i.e. retain) + * + * @return filter, filtered to only those elements that *are* present in *any* of the provided sets + */ + static <T> Iterable<T> filterIn(Iterable<T> filter, Set<T>... inAny) + { + return filter(filter, orIn(inAny)); + } + + static Set<SSTableReader> emptySet() + { + return Collections.emptySet(); + } + + static <T> T select(T t, Collection<T> col) + { + if (col instanceof Set && !col.contains(t)) + return null; + return getFirst(filter(col, equalTo(t)), null); + } + + static <T> T selectFirst(T t, Collection<T> ... sets) + { + for (Collection<T> set : sets) + { + T select = select(t, set); + if (select != null) + return select; + } + return null; + } + + static <T> Predicate<T> idIn(Set<T> set) + { + return idIn(identityMap(set)); + } + + static <T> Predicate<T> idIn(final Map<T, T> identityMap) + { + return new Predicate<T>() + { + public boolean apply(T t) + { + return identityMap.get(t) == t; + } + }; + } + +}