Garbage-collecting compaction operation and schema option. patch by Branimir Lambov; reviewed by Marcus Eriksson for CASSANDRA-7019
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d40ac784 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d40ac784 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d40ac784 Branch: refs/heads/trunk Commit: d40ac784d3a8ddaf71a2df8b21745827392294cc Parents: 2939080 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Wed Dec 23 12:50:58 2015 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Aug 1 14:20:50 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 22 ++ .../apache/cassandra/db/ColumnFamilyStore.java | 6 + .../compaction/AbstractCompactionStrategy.java | 2 + .../db/compaction/CompactionController.java | 68 +++- .../db/compaction/CompactionIterator.java | 243 +++++++++++- .../db/compaction/CompactionManager.java | 39 +- .../DateTieredCompactionStrategy.java | 2 +- .../compaction/LeveledCompactionStrategy.java | 12 +- .../cassandra/db/compaction/OperationType.java | 3 +- .../cassandra/db/compaction/Scrubber.java | 46 ++- .../SizeTieredCompactionStrategy.java | 3 +- .../TimeWindowCompactionStrategy.java | 4 +- .../cassandra/db/compaction/Verifier.java | 2 +- .../org/apache/cassandra/db/rows/Cells.java | 80 ++++ src/java/org/apache/cassandra/db/rows/Rows.java | 77 ++++ .../cassandra/db/rows/UnfilteredSerializer.java | 54 +++ .../cassandra/index/sasi/SASIIndexBuilder.java | 2 +- .../io/sstable/SSTableIdentityIterator.java | 81 +++- .../io/sstable/SSTableSimpleIterator.java | 75 ++++ .../io/sstable/format/SSTableReader.java | 18 +- .../io/sstable/format/big/BigTableReader.java | 13 +- .../io/sstable/format/big/BigTableScanner.java | 2 +- .../cassandra/schema/CompactionParams.java | 27 +- .../cassandra/service/StorageService.java | 14 + .../cassandra/service/StorageServiceMBean.java | 6 + .../org/apache/cassandra/tools/NodeProbe.java | 17 +- .../org/apache/cassandra/tools/NodeTool.java | 1 + .../tools/nodetool/GarbageCollect.java | 64 ++++ .../cassandra/cql3/GcCompactionBench.java | 374 ++++++++++++++++++ .../apache/cassandra/cql3/GcCompactionTest.java | 364 ++++++++++++++++++ .../selection/SelectionColumnMappingTest.java | 2 +- .../db/compaction/CompactionIteratorTest.java | 377 +++++++++++++++++++ .../rows/UnfilteredRowIteratorsMergeTest.java | 55 +-- .../db/rows/UnfilteredRowsGenerator.java | 340 +++++++++++++++++ 35 files changed, 2399 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 36a21e6..760cc58 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019) * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190) * Add beta protocol flag for v5 native protocol (CASSANDRA-12142) * Support filtering on non-PRIMARY KEY columns in the CREATE http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 8580c7c..85f2767 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -19,6 +19,28 @@ using the provided 'sstableupgrade' tool. New features ------------ + - Compaction can now take into account overlapping tables that don't take part + in the compaction to look for deleted or overwritten data in the compacted tables. + Then such data is found, it can be safely discarded, which in turn should enable + the removal of tombstones over that data. + + The behavior can be engaged in two ways: + - as a "nodetool garbagecollect -g CELL/ROW" operation, which applies + single-table compaction on all sstables to discard deleted data in one step. + - as a "provide_overlapping_tombstones:CELL/ROW/NONE" compaction strategy flag, + which uses overlapping tables as a source of deletions/overwrites during all + compactions. + The argument specifies the granularity at which deleted data is to be found: + - If ROW is specified, only whole deleted rows (or sets of rows) will be + discarded. + - If CELL is specified, any columns whose value is overwritten or deleted + will also be discarded. + - NONE (default) specifies the old behavior, overlapping tables are not used to + decide when to discard data. + Which option to use depends on your workload, both ROW and CELL increase the + disk load on compaction (especially with the size-tiered compaction strategy), + with CELL being more resource-intensive. Both should lead to better read + performance if deleting rows (resp. overwriting or deleting cells) is common. - Prepared statements are now persisted in the table prepared_statements in the system keyspace. Upon startup, this table is used to preload all previously prepared statements - i.e. in many cases clients do not need to http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 594da98..20dac1e 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -71,6 +71,7 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.metrics.TableMetrics.Sampler; import org.apache.cassandra.schema.*; +import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.*; @@ -1580,6 +1581,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return CompactionManager.instance.relocateSSTables(this, jobs); } + public CompactionManager.AllSSTableOpStatus garbageCollect(TombstoneOption tombstoneOption, int jobs) throws ExecutionException, InterruptedException + { + return CompactionManager.instance.performGarbageCollection(this, tombstoneOption, jobs); + } + public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType) { assert !sstables.isEmpty(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 4728ec3..83592f0 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -44,6 +44,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.utils.JVMStabilityInspector; /** @@ -492,6 +493,7 @@ public abstract class AbstractCompactionStrategy uncheckedOptions.remove(LOG_ALL_OPTION); uncheckedOptions.remove(COMPACTION_ENABLED); uncheckedOptions.remove(ONLY_PURGE_REPAIRED_TOMBSTONES); + uncheckedOptions.remove(CompactionParams.Option.PROVIDE_OVERLAPPING_TOMBSTONES.toString()); return uncheckedOptions; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index e42e7a1..08ad0c0 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -20,18 +20,23 @@ package org.apache.cassandra.db.compaction; import java.util.*; import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +import com.google.common.base.Predicates; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.schema.CompactionParams.TombstoneOption; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.*; import org.apache.cassandra.utils.AlwaysPresentFilter; - import org.apache.cassandra.utils.OverlapIterator; import org.apache.cassandra.utils.concurrent.Refs; @@ -53,6 +58,10 @@ public class CompactionController implements AutoCloseable private Refs<SSTableReader> overlappingSSTables; private OverlapIterator<PartitionPosition, SSTableReader> overlapIterator; private final Iterable<SSTableReader> compacting; + private final RateLimiter limiter; + private final long minTimestamp; + final TombstoneOption tombstoneOption; + final Map<SSTableReader, FileDataInput> openDataFiles = new HashMap<>(); public final int gcBefore; @@ -63,11 +72,23 @@ public class CompactionController implements AutoCloseable public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting, int gcBefore) { + this(cfs, compacting, gcBefore, + CompactionManager.instance.getRateLimiter(), + cfs.getCompactionStrategyManager().getCompactionParams().tombstoneOption()); + } + + public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting, int gcBefore, RateLimiter limiter, TombstoneOption tombstoneOption) + { assert cfs != null; this.cfs = cfs; this.gcBefore = gcBefore; this.compacting = compacting; + this.limiter = limiter; compactingRepaired = compacting != null && compacting.stream().allMatch(SSTableReader::isRepaired); + this.tombstoneOption = tombstoneOption; + this.minTimestamp = compacting != null && !compacting.isEmpty() // check needed for test + ? compacting.stream().mapToLong(SSTableReader::getMinTimestamp).min().getAsLong() + : 0; refreshOverlaps(); if (NEVER_PURGE_TOMBSTONES) logger.warn("You are running with -Dcassandra.never_purge_tombstones=true, this is dangerous!"); @@ -97,7 +118,7 @@ public class CompactionController implements AutoCloseable return; if (this.overlappingSSTables != null) - overlappingSSTables.release(); + close(); if (compacting == null) overlappingSSTables = Refs.tryRef(Collections.<SSTableReader>emptyList()); @@ -228,6 +249,9 @@ public class CompactionController implements AutoCloseable { if (overlappingSSTables != null) overlappingSSTables.release(); + + FileUtils.closeQuietly(openDataFiles.values()); + openDataFiles.clear(); } public boolean compactingRepaired() @@ -235,4 +259,38 @@ public class CompactionController implements AutoCloseable return !cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones() || compactingRepaired; } + boolean provideTombstoneSources() + { + return tombstoneOption != TombstoneOption.NONE; + } + + // caller must close iterators + public Iterable<UnfilteredRowIterator> shadowSources(DecoratedKey key, boolean tombstoneOnly) + { + if (!provideTombstoneSources() || !compactingRepaired() || NEVER_PURGE_TOMBSTONES) + return null; + overlapIterator.update(key); + return Iterables.filter(Iterables.transform(overlapIterator.overlaps(), + reader -> getShadowIterator(reader, key, tombstoneOnly)), + Predicates.notNull()); + } + + @SuppressWarnings("resource") // caller to close + private UnfilteredRowIterator getShadowIterator(SSTableReader reader, DecoratedKey key, boolean tombstoneOnly) + { + if (reader.isMarkedSuspect() || + reader.getMaxTimestamp() <= minTimestamp || + tombstoneOnly && !reader.hasTombstones()) + return null; + RowIndexEntry<?> position = reader.getPosition(key, SSTableReader.Operator.EQ); + if (position == null) + return null; + FileDataInput dfile = openDataFiles.computeIfAbsent(reader, this::openDataFile); + return reader.simpleIterator(dfile, key, position, tombstoneOnly); + } + + private FileDataInput openDataFile(SSTableReader reader) + { + return limiter != null ? reader.openDataReader(limiter) : reader.openDataReader(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 0111aec..c4edfa6 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -17,14 +17,13 @@ */ package org.apache.cassandra.db.compaction; -import java.util.List; -import java.util.UUID; +import java.util.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.collect.Ordering; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.partitions.PurgeFunction; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; @@ -33,6 +32,7 @@ import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.index.transactions.CompactionTransaction; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.metrics.CompactionMetrics; +import org.apache.cassandra.schema.CompactionParams.TombstoneOption; /** * Merge multiple iterators over the content of sstable into a "compacted" iterator. @@ -52,7 +52,6 @@ import org.apache.cassandra.metrics.CompactionMetrics; */ public class CompactionIterator extends CompactionInfo.Holder implements UnfilteredPartitionIterator { - private static final Logger logger = LoggerFactory.getLogger(CompactionIterator.class); private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100; private final OperationType type; @@ -104,6 +103,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte ? EmptyIterators.unfilteredPartition(controller.cfs.metadata, false) : UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()); boolean isForThrift = merged.isForThrift(); // to stop capture of iterator in Purger, which is confusing for debug + merged = Transformation.apply(merged, new GarbageSkipper(controller, nowInSec)); this.compacted = Transformation.apply(merged, new Purger(isForThrift, controller, nowInSec)); } @@ -313,4 +313,237 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte return maxPurgeableTimestamp; } } + + /** + * Unfiltered row iterator that removes deleted data as provided by a "tombstone source" for the partition. + * The result produced by this iterator is such that when merged with tombSource it produces the same output + * as the merge of dataSource and tombSource. + */ + private static class GarbageSkippingUnfilteredRowIterator extends WrappingUnfilteredRowIterator + { + final UnfilteredRowIterator tombSource; + final DeletionTime partitionLevelDeletion; + final Row staticRow; + final ColumnFilter cf; + final int nowInSec; + final CFMetaData metadata; + final boolean cellLevelGC; + + DeletionTime tombOpenDeletionTime = DeletionTime.LIVE; + DeletionTime dataOpenDeletionTime = DeletionTime.LIVE; + DeletionTime openDeletionTime = DeletionTime.LIVE; + DeletionTime partitionDeletionTime; + DeletionTime activeDeletionTime; + Unfiltered tombNext = null; + Unfiltered dataNext = null; + Unfiltered next = null; + + /** + * Construct an iterator that filters out data shadowed by the provided "tombstone source". + * + * @param dataSource The input row. The result is a filtered version of this. + * @param tombSource Tombstone source, i.e. iterator used to identify deleted data in the input row. + * @param nowInSec Current time, used in choosing the winner when cell expiration is involved. + * @param cellLevelGC If false, the iterator will only look at row-level deletion times and tombstones. + * If true, deleted or overwritten cells within a surviving row will also be removed. + */ + protected GarbageSkippingUnfilteredRowIterator(UnfilteredRowIterator dataSource, UnfilteredRowIterator tombSource, int nowInSec, boolean cellLevelGC) + { + super(dataSource); + this.tombSource = tombSource; + this.nowInSec = nowInSec; + this.cellLevelGC = cellLevelGC; + metadata = dataSource.metadata(); + cf = ColumnFilter.all(metadata); + + activeDeletionTime = partitionDeletionTime = tombSource.partitionLevelDeletion(); + + // Only preserve partition level deletion if not shadowed. (Note: Shadowing deletion must not be copied.) + this.partitionLevelDeletion = dataSource.partitionLevelDeletion().supersedes(tombSource.partitionLevelDeletion()) ? + dataSource.partitionLevelDeletion() : + DeletionTime.LIVE; + + Row dataStaticRow = garbageFilterRow(dataSource.staticRow(), tombSource.staticRow()); + this.staticRow = dataStaticRow != null ? dataStaticRow : Rows.EMPTY_STATIC_ROW; + + tombNext = advance(tombSource); + dataNext = advance(dataSource); + } + + private static Unfiltered advance(UnfilteredRowIterator source) + { + return source.hasNext() ? source.next() : null; + } + + @Override + public DeletionTime partitionLevelDeletion() + { + return partitionLevelDeletion; + } + + public void close() + { + super.close(); + tombSource.close(); + } + + @Override + public Row staticRow() + { + return staticRow; + } + + @Override + public boolean hasNext() + { + // Produce the next element. This may consume multiple elements from both inputs until we find something + // from dataSource that is still live. We track the currently open deletion in both sources, as well as the + // one we have last issued to the output. The tombOpenDeletionTime is used to filter out content; the others + // to decide whether or not a tombstone is superseded, and to be able to surface (the rest of) a deletion + // range from the input when a suppressing deletion ends. + while (next == null && dataNext != null) + { + int cmp = tombNext == null ? -1 : metadata.comparator.compare(dataNext, tombNext); + if (cmp < 0) + { + if (dataNext.isRow()) + next = ((Row) dataNext).filter(cf, activeDeletionTime, false, metadata); + else + next = processDataMarker(); + } + else if (cmp == 0) + { + if (dataNext.isRow()) + { + next = garbageFilterRow((Row) dataNext, (Row) tombNext); + } + else + { + tombOpenDeletionTime = updateOpenDeletionTime(tombOpenDeletionTime, tombNext); + activeDeletionTime = Ordering.natural().max(partitionDeletionTime, + tombOpenDeletionTime); + next = processDataMarker(); + } + } + else // (cmp > 0) + { + if (tombNext.isRangeTombstoneMarker()) + { + tombOpenDeletionTime = updateOpenDeletionTime(tombOpenDeletionTime, tombNext); + activeDeletionTime = Ordering.natural().max(partitionDeletionTime, + tombOpenDeletionTime); + boolean supersededBefore = openDeletionTime.isLive(); + boolean supersededAfter = !dataOpenDeletionTime.supersedes(activeDeletionTime); + // If a range open was not issued because it was superseded and the deletion isn't superseded any more, we need to open it now. + if (supersededBefore && !supersededAfter) + next = new RangeTombstoneBoundMarker(((RangeTombstoneMarker) tombNext).closeBound(false).invert(), dataOpenDeletionTime); + // If the deletion begins to be superseded, we don't close the range yet. This can save us a close/open pair if it ends after the superseding range. + } + } + + if (next instanceof RangeTombstoneMarker) + openDeletionTime = updateOpenDeletionTime(openDeletionTime, next); + + if (cmp <= 0) + dataNext = advance(wrapped); + if (cmp >= 0) + tombNext = advance(tombSource); + } + return next != null; + } + + protected Row garbageFilterRow(Row dataRow, Row tombRow) + { + if (cellLevelGC) + { + return Rows.removeShadowedCells(dataRow, tombRow, activeDeletionTime, nowInSec); + } + else + { + DeletionTime deletion = Ordering.natural().max(tombRow.deletion().time(), + activeDeletionTime); + return dataRow.filter(cf, deletion, false, metadata); + } + } + + /** + * Decide how to act on a tombstone marker from the input iterator. We can decide what to issue depending on + * whether or not the ranges before and after the marker are superseded/live -- if none are, we can reuse the + * marker; if both are, the marker can be ignored; otherwise we issue a corresponding start/end marker. + */ + private RangeTombstoneMarker processDataMarker() + { + dataOpenDeletionTime = updateOpenDeletionTime(dataOpenDeletionTime, dataNext); + boolean supersededBefore = openDeletionTime.isLive(); + boolean supersededAfter = !dataOpenDeletionTime.supersedes(activeDeletionTime); + RangeTombstoneMarker marker = (RangeTombstoneMarker) dataNext; + if (!supersededBefore) + if (!supersededAfter) + return marker; + else + return new RangeTombstoneBoundMarker(marker.closeBound(false), marker.closeDeletionTime(false)); + else + if (!supersededAfter) + return new RangeTombstoneBoundMarker(marker.openBound(false), marker.openDeletionTime(false)); + else + return null; + } + + @Override + public Unfiltered next() + { + if (!hasNext()) + throw new IllegalStateException(); + + Unfiltered v = next; + next = null; + return v; + } + + private DeletionTime updateOpenDeletionTime(DeletionTime openDeletionTime, Unfiltered next) + { + RangeTombstoneMarker marker = (RangeTombstoneMarker) next; + assert openDeletionTime.isLive() == !marker.isClose(false); + assert openDeletionTime.isLive() || openDeletionTime.equals(marker.closeDeletionTime(false)); + return marker.isOpen(false) ? marker.openDeletionTime(false) : DeletionTime.LIVE; + } + } + + /** + * Partition transformation applying GarbageSkippingUnfilteredRowIterator, obtaining tombstone sources for each + * partition using the controller's shadowSources method. + */ + private static class GarbageSkipper extends Transformation<UnfilteredRowIterator> + { + final int nowInSec; + final CompactionController controller; + final boolean cellLevelGC; + + private GarbageSkipper(CompactionController controller, int nowInSec) + { + this.controller = controller; + this.nowInSec = nowInSec; + cellLevelGC = controller.tombstoneOption == TombstoneOption.CELL; + } + + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + Iterable<UnfilteredRowIterator> sources = controller.shadowSources(partition.partitionKey(), !cellLevelGC); + if (sources == null) + return partition; + List<UnfilteredRowIterator> iters = new ArrayList<>(); + for (UnfilteredRowIterator iter : sources) + { + if (!iter.isEmpty()) + iters.add(iter); + else + iter.close(); + } + if (iters.isEmpty()) + return partition; + + return new GarbageSkippingUnfilteredRowIterator(partition, UnfilteredRowIterators.merge(iters, nowInSec), nowInSec, cellLevelGC); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 519ff05..1cfc76b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -63,6 +63,7 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.repair.Validator; +import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.*; @@ -442,7 +443,7 @@ public class CompactionManager implements CompactionManagerMBean public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) { List<SSTableReader> sortedSSTables = Lists.newArrayList(transaction.originals()); - Collections.sort(sortedSSTables, new SSTableReader.SizeComparator()); + Collections.sort(sortedSSTables, SSTableReader.sizeComparator); return sortedSSTables; } @@ -455,6 +456,42 @@ public class CompactionManager implements CompactionManagerMBean }, jobs, OperationType.CLEANUP); } + public AllSSTableOpStatus performGarbageCollection(final ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws InterruptedException, ExecutionException + { + assert !cfStore.isIndex(); + + return parallelAllSSTableOperation(cfStore, new OneSSTableOperation() + { + @Override + public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction) + { + Iterable<SSTableReader> originals = transaction.originals(); + if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones()) + originals = Iterables.filter(originals, SSTableReader::isRepaired); + List<SSTableReader> sortedSSTables = Lists.newArrayList(originals); + Collections.sort(sortedSSTables, SSTableReader.maxTimestampComparator); + return sortedSSTables; + } + + @Override + public void execute(LifecycleTransaction txn) throws IOException + { + logger.debug("Garbage collecting {}", txn.originals()); + CompactionTask task = new CompactionTask(cfStore, txn, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds())) + { + @Override + protected CompactionController getCompactionController(Set<SSTableReader> toCompact) + { + return new CompactionController(cfStore, toCompact, gcBefore, getRateLimiter(), tombstoneOption); + } + }; + task.setUserDefined(true); + task.setCompactionType(OperationType.GARBAGE_COLLECT); + task.execute(metrics); + } + }, jobs, OperationType.GARBAGE_COLLECT); + } + public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, int jobs) throws ExecutionException, InterruptedException { if (!cfs.getPartitioner().splitter().isPresent()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index cfe0121..5442a2d 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -138,7 +138,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy if (sstablesWithTombstones.isEmpty()) return Collections.emptyList(); - return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator())); + return Collections.singletonList(Collections.min(sstablesWithTombstones, SSTableReader.sizeComparator)); } private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables, long now, int base) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index ec5e1d9..25c5d20 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -145,7 +145,17 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy @Override public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) { - throw new UnsupportedOperationException("LevelDB compaction strategy does not allow user-specified compactions"); + if (sstables.size() != 1) + throw new UnsupportedOperationException("LevelDB compaction strategy does not allow user-specified compactions"); + + LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION); + if (transaction == null) + { + logger.trace("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables); + return null; + } + int level = sstables.iterator().next().getSSTableLevel(); + return getCompactionTask(transaction, gcBefore, level == 0 ? Integer.MAX_VALUE : getMaxSSTableBytes()); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/OperationType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java index 84a34c9..27b8530 100644 --- a/src/java/org/apache/cassandra/db/compaction/OperationType.java +++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java @@ -38,7 +38,8 @@ public enum OperationType WRITE("Write"), VIEW_BUILD("View build"), INDEX_SUMMARY("Index summary redistribution"), - RELOCATE("Relocate sstables to correct disk"); + RELOCATE("Relocate sstables to correct disk"), + GARBAGE_COLLECT("Remove deleted data"); public final String type; public final String fileName; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index a9cb211..2cfc75d 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -210,7 +210,8 @@ public class Scrubber implements Closeable if (indexFile != null && dataStart != dataStartFromIndex) outputHandler.warn(String.format("Data file row position %d differs from index file row position %d", dataStart, dataStartFromIndex)); - try (UnfilteredRowIterator iterator = withValidation(new RowMergingSSTableIterator(sstable, dataFile, key), dataFile.getPath())) + try (UnfilteredRowIterator iterator = withValidation(new RowMergingSSTableIterator(SSTableIdentityIterator.create(sstable, dataFile, key)), + dataFile.getPath())) { if (prevKey != null && prevKey.compareTo(key) > 0) { @@ -241,7 +242,7 @@ public class Scrubber implements Closeable { dataFile.seek(dataStartFromIndex); - try (UnfilteredRowIterator iterator = withValidation(new SSTableIdentityIterator(sstable, dataFile, key), dataFile.getPath())) + try (UnfilteredRowIterator iterator = withValidation(SSTableIdentityIterator.create(sstable, dataFile, key), dataFile.getPath())) { if (prevKey != null && prevKey.compareTo(key) > 0) { @@ -471,38 +472,43 @@ public class Scrubber implements Closeable * * For more details, refer to CASSANDRA-12144. */ - private static class RowMergingSSTableIterator extends SSTableIdentityIterator + private static class RowMergingSSTableIterator extends WrappingUnfilteredRowIterator { - RowMergingSSTableIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key) + Unfiltered nextToOffer = null; + + RowMergingSSTableIterator(UnfilteredRowIterator source) { - super(sstable, file, key); + super(source); } @Override - protected Unfiltered doCompute() + public boolean hasNext() { - if (!iterator.hasNext()) - return endOfData(); + return nextToOffer != null || wrapped.hasNext(); + } - Unfiltered next = iterator.next(); - if (!next.isRow()) - return next; + @Override + public Unfiltered next() + { + Unfiltered next = nextToOffer != null ? nextToOffer : wrapped.next(); - while (iterator.hasNext()) + if (next.isRow()) { - Unfiltered peek = iterator.peek(); - // If there was a duplicate row, merge it. - if (next.clustering().equals(peek.clustering()) && peek.isRow()) + while (wrapped.hasNext()) { - iterator.next(); // Make sure that the peeked item was consumed. + Unfiltered peek = wrapped.next(); + if (!peek.isRow() || !next.clustering().equals(peek.clustering())) + { + nextToOffer = peek; // Offer peek in next call + return next; + } + + // Duplicate row, merge it. next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds()); } - else - { - break; - } } + nextToOffer = null; return next; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 7cca4a7..8302a9b 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -101,8 +101,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy if (sstablesWithTombstones.isEmpty()) return Collections.emptyList(); - Collections.sort(sstablesWithTombstones, new SSTableReader.SizeComparator()); - return Collections.singletonList(sstablesWithTombstones.get(0)); + return Collections.singletonList(Collections.max(sstablesWithTombstones, SSTableReader.sizeComparator)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java index 55daaa1..fd53930 100644 --- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java @@ -143,7 +143,7 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy if (sstablesWithTombstones.isEmpty()) return Collections.emptyList(); - return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator())); + return Collections.singletonList(Collections.min(sstablesWithTombstones, SSTableReader.sizeComparator)); } private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables) @@ -314,7 +314,7 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy List<SSTableReader> ssTableReaders = new ArrayList<>(bucket); // Trim the largest sstables off the end to meet the maxThreshold - Collections.sort(ssTableReaders, new SSTableReader.SizeComparator()); + Collections.sort(ssTableReaders, SSTableReader.sizeComparator); return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index 91c7ad7..17b1187 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -187,7 +187,7 @@ public class Verifier implements Closeable markAndThrow(); //mimic the scrub read path - try (UnfilteredRowIterator iterator = new SSTableIdentityIterator(sstable, dataFile, key)) + try (UnfilteredRowIterator iterator = SSTableIdentityIterator.create(sstable, dataFile, key)) { } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/rows/Cells.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java b/src/java/org/apache/cassandra/db/rows/Cells.java index 54df26e..38bde16 100644 --- a/src/java/org/apache/cassandra/db/rows/Cells.java +++ b/src/java/org/apache/cassandra/db/rows/Cells.java @@ -233,6 +233,86 @@ public abstract class Cells return timeDelta; } + /** + * Adds to the builder a representation of the given existing cell that, when merged/reconciled with the given + * update cell, produces the same result as merging the original with the update. + * <p> + * For simple cells that is either the original cell (if still live), or nothing (if shadowed). + * + * @param existing the pre-existing cell, the one that is updated. + * @param update the newly added cell, the update. This can be {@code null} out + * of convenience, in which case this function simply copy {@code existing} to + * {@code writer}. + * @param deletion the deletion time that applies to the cells being considered. + * This deletion time may delete both {@code existing} or {@code update}. + * @param builder the row builder to which the result of the filtering is written. + * @param nowInSec the current time in seconds (which plays a role during reconciliation + * because deleted cells always have precedence on timestamp equality and deciding if a + * cell is a live or not depends on the current time due to expiring cells). + */ + public static void addNonShadowed(Cell existing, + Cell update, + DeletionTime deletion, + Row.Builder builder, + int nowInSec) + { + if (deletion.deletes(existing)) + return; + + Cell reconciled = reconcile(existing, update, nowInSec); + if (reconciled != update) + builder.addCell(existing); + } + + /** + * Adds to the builder a representation of the given existing cell that, when merged/reconciled with the given + * update cell, produces the same result as merging the original with the update. + * <p> + * For simple cells that is either the original cell (if still live), or nothing (if shadowed). + * + * @param column the complex column the cells are for. + * @param existing the pre-existing cells, the ones that are updated. + * @param update the newly added cells, the update. This can be {@code null} out + * of convenience, in which case this function simply copy the cells from + * {@code existing} to {@code writer}. + * @param deletion the deletion time that applies to the cells being considered. + * This deletion time may delete both {@code existing} or {@code update}. + * @param builder the row builder to which the result of the filtering is written. + * @param nowInSec the current time in seconds (which plays a role during reconciliation + * because deleted cells always have precedence on timestamp equality and deciding if a + * cell is a live or not depends on the current time due to expiring cells). + */ + public static void addNonShadowedComplex(ColumnDefinition column, + Iterator<Cell> existing, + Iterator<Cell> update, + DeletionTime deletion, + Row.Builder builder, + int nowInSec) + { + Comparator<CellPath> comparator = column.cellPathComparator(); + Cell nextExisting = getNext(existing); + Cell nextUpdate = getNext(update); + while (nextExisting != null) + { + int cmp = nextUpdate == null ? -1 : comparator.compare(nextExisting.path(), nextUpdate.path()); + if (cmp < 0) + { + addNonShadowed(nextExisting, null, deletion, builder, nowInSec); + nextExisting = getNext(existing); + } + else if (cmp == 0) + { + addNonShadowed(nextExisting, nextUpdate, deletion, builder, nowInSec); + nextExisting = getNext(existing); + nextUpdate = getNext(update); + } + else + { + nextUpdate = getNext(update); + } + } + } + private static Cell getNext(Iterator<Cell> iterator) { return iterator == null || !iterator.hasNext() ? null : iterator.next(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/rows/Rows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java index 4f6c8d2..e6d9062 100644 --- a/src/java/org/apache/cassandra/db/rows/Rows.java +++ b/src/java/org/apache/cassandra/db/rows/Rows.java @@ -25,6 +25,7 @@ import com.google.common.collect.PeekingIterator; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; +import org.apache.cassandra.db.rows.Row.Deletion; import org.apache.cassandra.utils.MergeIterator; import org.apache.cassandra.utils.WrappedInt; @@ -311,4 +312,80 @@ public abstract class Rows } return timeDelta; } + + /** + * Returns a row that is obtained from the given existing row by removing everything that is shadowed by data in + * the update row. In other words, produces the smallest result row such that + * {@code merge(result, update, nowInSec) == merge(existing, update, nowInSec)} after filtering by rangeDeletion. + * + * @param existing source row + * @param update shadowing row + * @param rangeDeletion extra {@code DeletionTime} from covering tombstone + * @param nowInSec the current time in seconds (which plays a role during reconciliation + * because deleted cells always have precedence on timestamp equality and deciding if a + * cell is a live or not depends on the current time due to expiring cells). + */ + public static Row removeShadowedCells(Row existing, Row update, DeletionTime rangeDeletion, int nowInSec) + { + Row.Builder builder = BTreeRow.sortedBuilder(); + Clustering clustering = existing.clustering(); + builder.newRow(clustering); + + DeletionTime deletion = update.deletion().time(); + if (rangeDeletion.supersedes(deletion)) + deletion = rangeDeletion; + + LivenessInfo existingInfo = existing.primaryKeyLivenessInfo(); + if (!deletion.deletes(existingInfo)) + builder.addPrimaryKeyLivenessInfo(existingInfo); + Row.Deletion rowDeletion = existing.deletion(); + if (!deletion.supersedes(rowDeletion.time())) + builder.addRowDeletion(rowDeletion); + + Iterator<ColumnData> a = existing.iterator(); + Iterator<ColumnData> b = update.iterator(); + ColumnData nexta = a.hasNext() ? a.next() : null, nextb = b.hasNext() ? b.next() : null; + while (nexta != null) + { + int comparison = nextb == null ? -1 : nexta.column.compareTo(nextb.column); + if (comparison <= 0) + { + ColumnData cura = nexta; + ColumnDefinition column = cura.column; + ColumnData curb = comparison == 0 ? nextb : null; + if (column.isSimple()) + { + Cells.addNonShadowed((Cell) cura, (Cell) curb, deletion, builder, nowInSec); + } + else + { + ComplexColumnData existingData = (ComplexColumnData) cura; + ComplexColumnData updateData = (ComplexColumnData) curb; + + DeletionTime existingDt = existingData.complexDeletion(); + DeletionTime updateDt = updateData == null ? DeletionTime.LIVE : updateData.complexDeletion(); + + DeletionTime maxDt = updateDt.supersedes(deletion) ? updateDt : deletion; + if (existingDt.supersedes(maxDt)) + { + builder.addComplexDeletion(column, existingDt); + maxDt = existingDt; + } + + Iterator<Cell> existingCells = existingData.iterator(); + Iterator<Cell> updateCells = updateData == null ? null : updateData.iterator(); + Cells.addNonShadowedComplex(column, existingCells, updateCells, maxDt, builder, nowInSec); + } + nexta = a.hasNext() ? a.next() : null; + if (curb != null) + nextb = b.hasNext() ? b.next() : null; + } + else + { + nextb = b.hasNext() ? b.next() : null; + } + } + Row row = builder.build(); + return row != null && !row.isEmpty() ? row : null; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index ed6bd12..db18859 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -24,9 +24,11 @@ import com.google.common.collect.Collections2; import net.nicoulaj.compilecommand.annotations.Inline; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.Row.Deletion; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.WrappedException; @@ -450,6 +452,58 @@ public class UnfilteredSerializer } } + public Unfiltered deserializeTombstonesOnly(FileDataInput in, SerializationHeader header, SerializationHelper helper) + throws IOException + { + while (true) + { + int flags = in.readUnsignedByte(); + if (isEndOfPartition(flags)) + return null; + + int extendedFlags = readExtendedFlags(in, flags); + + if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) + { + ClusteringBoundOrBoundary bound = ClusteringBoundOrBoundary.serializer.deserialize(in, helper.version, header.clusteringTypes()); + return deserializeMarkerBody(in, header, bound); + } + else + { + assert !isStatic(extendedFlags); // deserializeStaticRow should be used for that. + if ((flags & HAS_DELETION) != 0) + { + assert header.isForSSTable(); + boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0; + boolean hasTTL = (flags & HAS_TTL) != 0; + boolean deletionIsShadowable = (extendedFlags & HAS_SHADOWABLE_DELETION) != 0; + Clustering clustering = Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes()); + long nextPosition = in.readUnsignedVInt() + in.getFilePointer(); + in.readUnsignedVInt(); // skip previous unfiltered size + if (hasTimestamp) + { + header.readTimestamp(in); + if (hasTTL) + { + header.readTTL(in); + header.readLocalDeletionTime(in); + } + } + + Deletion deletion = new Row.Deletion(header.readDeletionTime(in), deletionIsShadowable); + in.seek(nextPosition); + return BTreeRow.emptyDeletedRow(clustering, deletion); + } + else + { + Clustering.serializer.skip(in, helper.version, header.clusteringTypes()); + skipRowBody(in); + // Continue with next item. + } + } + } + } + public Row deserializeStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper helper) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java index de8d69b..1173d40 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java @@ -96,7 +96,7 @@ class SASIIndexBuilder extends SecondaryIndexBuilder dataFile.seek(indexEntry.position); ByteBufferUtil.readWithShortLength(dataFile); // key - try (SSTableIdentityIterator partition = new SSTableIdentityIterator(sstable, dataFile, key)) + try (SSTableIdentityIterator partition = SSTableIdentityIterator.create(sstable, dataFile, key)) { // if the row has statics attached, it has to be indexed separately indexWriter.nextUnfilteredCluster(partition.staticRow()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index a5af334..2a79f88 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -19,15 +19,15 @@ package org.apache.cassandra.io.sstable; import java.io.*; -import org.apache.cassandra.utils.AbstractIterator; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.utils.ByteBufferUtil; -public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implements Comparable<SSTableIdentityIterator>, UnfilteredRowIterator +public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, UnfilteredRowIterator { private final SSTableReader sstable; private final DecoratedKey key; @@ -37,29 +37,51 @@ public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implem protected final SSTableSimpleIterator iterator; private final Row staticRow; - /** - * Used to iterate through the columns of a row. - * @param sstable SSTable we are reading ffrom. - * @param file Reading using this file. - * @param key Key of this row. - */ - public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key) + public SSTableIdentityIterator(SSTableReader sstable, DecoratedKey key, DeletionTime partitionLevelDeletion, + String filename, SSTableSimpleIterator iterator) throws IOException { + super(); this.sstable = sstable; - this.filename = file.getPath(); this.key = key; + this.partitionLevelDeletion = partitionLevelDeletion; + this.filename = filename; + this.iterator = iterator; + this.staticRow = iterator.readStaticRow(); + } + public static SSTableIdentityIterator create(SSTableReader sstable, RandomAccessReader file, DecoratedKey key) + { try { - this.partitionLevelDeletion = DeletionTime.serializer.deserialize(file); + DeletionTime partitionLevelDeletion = DeletionTime.serializer.deserialize(file); SerializationHelper helper = new SerializationHelper(sstable.metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL); - this.iterator = SSTableSimpleIterator.create(sstable.metadata, file, sstable.header, helper, partitionLevelDeletion); - this.staticRow = iterator.readStaticRow(); + SSTableSimpleIterator iterator = SSTableSimpleIterator.create(sstable.metadata, file, sstable.header, helper, partitionLevelDeletion); + return new SSTableIdentityIterator(sstable, key, partitionLevelDeletion, file.getPath(), iterator); } catch (IOException e) { sstable.markSuspect(); - throw new CorruptSSTableException(e, filename); + throw new CorruptSSTableException(e, file.getPath()); + } + } + + public static SSTableIdentityIterator create(SSTableReader sstable, FileDataInput dfile, RowIndexEntry<?> indexEntry, DecoratedKey key, boolean tombstoneOnly) + { + try + { + dfile.seek(indexEntry.position); + ByteBufferUtil.skipShortLength(dfile); // Skip partition key + DeletionTime partitionLevelDeletion = DeletionTime.serializer.deserialize(dfile); + SerializationHelper helper = new SerializationHelper(sstable.metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL); + SSTableSimpleIterator iterator = tombstoneOnly + ? SSTableSimpleIterator.createTombstoneOnly(sstable.metadata, dfile, sstable.header, helper, partitionLevelDeletion) + : SSTableSimpleIterator.create(sstable.metadata, dfile, sstable.header, helper, partitionLevelDeletion); + return new SSTableIdentityIterator(sstable, key, partitionLevelDeletion, dfile.getPath(), iterator); + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, dfile.getPath()); } } @@ -93,7 +115,32 @@ public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implem return staticRow; } - protected Unfiltered computeNext() + public boolean hasNext() + { + try + { + return iterator.hasNext(); + } + catch (IndexOutOfBoundsException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, filename); + } + catch (IOError e) + { + if (e.getCause() instanceof IOException) + { + sstable.markSuspect(); + throw new CorruptSSTableException((Exception)e.getCause(), filename); + } + else + { + throw e; + } + } + } + + public Unfiltered next() { try { @@ -120,7 +167,7 @@ public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implem protected Unfiltered doCompute() { - return iterator.hasNext() ? iterator.next() : endOfData(); + return iterator.next(); } public void close() http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java index 2d4314e..ce42126 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java @@ -30,6 +30,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataPosition; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.net.MessagingService; /** @@ -59,6 +60,14 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered> return new CurrentFormatIterator(metadata, in, header, helper); } + public static SSTableSimpleIterator createTombstoneOnly(CFMetaData metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper, DeletionTime partitionDeletion) + { + if (helper.version < MessagingService.VERSION_30) + return new OldFormatTombstoneIterator(metadata, in, helper, partitionDeletion); + else + return new CurrentFormatTombstoneIterator(metadata, in, header, helper); + } + public abstract Row readStaticRow() throws IOException; private static class CurrentFormatIterator extends SSTableSimpleIterator @@ -93,6 +102,41 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered> } } + private static class CurrentFormatTombstoneIterator extends SSTableSimpleIterator + { + private final SerializationHeader header; + + private CurrentFormatTombstoneIterator(CFMetaData metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper) + { + super(metadata, in, helper); + this.header = header; + } + + public Row readStaticRow() throws IOException + { + if (header.hasStatic()) + { + Row staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, helper); + if (!staticRow.deletion().isLive()) + return BTreeRow.emptyDeletedRow(staticRow.clustering(), staticRow.deletion()); + } + return Rows.EMPTY_STATIC_ROW; + } + + protected Unfiltered computeNext() + { + try + { + Unfiltered unfiltered = UnfilteredSerializer.serializer.deserializeTombstonesOnly((FileDataInput) in, header, helper); + return unfiltered == null ? endOfData() : unfiltered; + } + catch (IOException e) + { + throw new IOError(e); + } + } + } + private static class OldFormatIterator extends SSTableSimpleIterator { private final UnfilteredDeserializer deserializer; @@ -163,4 +207,35 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered> } + private static class OldFormatTombstoneIterator extends OldFormatIterator + { + private OldFormatTombstoneIterator(CFMetaData metadata, DataInputPlus in, SerializationHelper helper, DeletionTime partitionDeletion) + { + super(metadata, in, helper, partitionDeletion); + } + + public Row readStaticRow() throws IOException + { + Row row = super.readStaticRow(); + if (!row.deletion().isLive()) + return BTreeRow.emptyDeletedRow(row.clustering(), row.deletion()); + return Rows.EMPTY_STATIC_ROW; + } + + protected Unfiltered computeNext() + { + while (true) + { + Unfiltered unfiltered = super.computeNext(); + if (unfiltered == null || unfiltered.isRangeTombstoneMarker()) + return unfiltered; + + Row row = (Row) unfiltered; + if (!row.deletion().isLive()) + return BTreeRow.emptyDeletedRow(row.clustering(), row.deletion()); + // Otherwise read next. + } + } + + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index d26edfa..32d3156 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -167,6 +167,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator); + public static final Comparator<SSTableReader> sizeComparator = new Comparator<SSTableReader>() + { + public int compare(SSTableReader o1, SSTableReader o2) + { + return Longs.compare(o1.onDiskLength(), o2.onDiskLength()); + } + }; + /** * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created @@ -1529,6 +1537,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public abstract UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift); public abstract UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift); + public abstract UnfilteredRowIterator simpleIterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, boolean tombstoneOnly); + /** * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists. */ @@ -2016,14 +2026,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return 0; } - public static class SizeComparator implements Comparator<SSTableReader> - { - public int compare(SSTableReader o1, SSTableReader o2) - { - return Longs.compare(o1.onDiskLength(), o2.onDiskLength()); - } - } - public EncodingStats stats() { // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index 7a7ce8c..8c64b01 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -30,13 +30,11 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; import org.slf4j.Logger; @@ -120,6 +118,13 @@ public class BigTableReader extends SSTableReader } + @SuppressWarnings("resource") // caller to close + @Override + public UnfilteredRowIterator simpleIterator(FileDataInput dfile, DecoratedKey key, RowIndexEntry position, boolean tombstoneOnly) + { + return SSTableIdentityIterator.create(this, dfile, position, key, tombstoneOnly); + } + /** * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to * allow key selection by token bounds but only if op != * EQ http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java index 6d31844..66213a6 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java @@ -331,7 +331,7 @@ public class BigTableScanner implements ISSTableScanner { dfile.seek(currentEntry.position); ByteBufferUtil.skipShortLength(dfile); // key - return new SSTableIdentityIterator(sstable, dfile, partitionKey()); + return SSTableIdentityIterator.create(sstable, dfile, partitionKey()); } ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(partitionKey()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/schema/CompactionParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/CompactionParams.java b/src/java/org/apache/cassandra/schema/CompactionParams.java index 720efa3..73271f1 100644 --- a/src/java/org/apache/cassandra/schema/CompactionParams.java +++ b/src/java/org/apache/cassandra/schema/CompactionParams.java @@ -45,7 +45,8 @@ public final class CompactionParams CLASS, ENABLED, MIN_THRESHOLD, - MAX_THRESHOLD; + MAX_THRESHOLD, + PROVIDE_OVERLAPPING_TOMBSTONES; @Override public String toString() @@ -54,27 +55,38 @@ public final class CompactionParams } } + public enum TombstoneOption + { + NONE, + ROW, + CELL; + } + public static final int DEFAULT_MIN_THRESHOLD = 4; public static final int DEFAULT_MAX_THRESHOLD = 32; public static final boolean DEFAULT_ENABLED = true; + public static final TombstoneOption DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES = + TombstoneOption.valueOf(System.getProperty("default.provide.overlapping.tombstones", TombstoneOption.NONE.toString()).toUpperCase()); public static final Map<String, String> DEFAULT_THRESHOLDS = ImmutableMap.of(Option.MIN_THRESHOLD.toString(), Integer.toString(DEFAULT_MIN_THRESHOLD), Option.MAX_THRESHOLD.toString(), Integer.toString(DEFAULT_MAX_THRESHOLD)); public static final CompactionParams DEFAULT = - new CompactionParams(SizeTieredCompactionStrategy.class, DEFAULT_THRESHOLDS, DEFAULT_ENABLED); + new CompactionParams(SizeTieredCompactionStrategy.class, DEFAULT_THRESHOLDS, DEFAULT_ENABLED, DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES); private final Class<? extends AbstractCompactionStrategy> klass; private final ImmutableMap<String, String> options; private final boolean isEnabled; + private final TombstoneOption tombstoneOption; - private CompactionParams(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options, boolean isEnabled) + private CompactionParams(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options, boolean isEnabled, TombstoneOption tombstoneOption) { this.klass = klass; this.options = ImmutableMap.copyOf(options); this.isEnabled = isEnabled; + this.tombstoneOption = tombstoneOption; } public static CompactionParams create(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options) @@ -82,6 +94,8 @@ public final class CompactionParams boolean isEnabled = options.containsKey(Option.ENABLED.toString()) ? Boolean.parseBoolean(options.get(Option.ENABLED.toString())) : DEFAULT_ENABLED; + TombstoneOption tombstoneOption = TombstoneOption.valueOf(options.getOrDefault(Option.PROVIDE_OVERLAPPING_TOMBSTONES.toString(), + DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES.toString()).toUpperCase()); Map<String, String> allOptions = new HashMap<>(options); if (supportsThresholdParams(klass)) @@ -90,7 +104,7 @@ public final class CompactionParams allOptions.putIfAbsent(Option.MAX_THRESHOLD.toString(), Integer.toString(DEFAULT_MAX_THRESHOLD)); } - return new CompactionParams(klass, allOptions, isEnabled); + return new CompactionParams(klass, allOptions, isEnabled, tombstoneOption); } public static CompactionParams scts(Map<String, String> options) @@ -119,6 +133,11 @@ public final class CompactionParams : Integer.parseInt(threshold); } + public TombstoneOption tombstoneOption() + { + return tombstoneOption; + } + public void validate() { try http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d64fc04..0a35296 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -75,6 +75,7 @@ import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.net.*; import org.apache.cassandra.repair.*; import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.service.paxos.CommitVerbHandler; import org.apache.cassandra.service.paxos.PrepareVerbHandler; @@ -2809,6 +2810,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return status.statusCode; } + public int garbageCollect(String tombstoneOptionString, int jobs, String keyspaceName, String ... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + TombstoneOption tombstoneOption = TombstoneOption.valueOf(tombstoneOptionString); + CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; + for (ColumnFamilyStore cfs : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) + { + CompactionManager.AllSSTableOpStatus oneStatus = cfs.garbageCollect(tombstoneOption, jobs); + if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) + status = oneStatus; + } + return status.statusCode; + } + /** * Takes the snapshot of a multiple column family from different keyspaces. A snapshot name must be specified. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index f7da817..abb10c1 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -290,6 +290,12 @@ public interface StorageServiceMBean extends NotificationEmitter public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException; /** + * Rewrites all sstables from the given tables to remove deleted data. + * The tombstone option defines the granularity of the procedure: ROW removes deleted partitions and rows, CELL also removes overwritten or deleted cells. + */ + public int garbageCollect(String tombstoneOption, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException; + + /** * Flush all memtables for the given column families, or all columnfamilies for the given keyspace * if none are explicitly listed. * @param keyspaceName http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 84eeb04..c33dfa4 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -73,6 +73,7 @@ import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.metrics.ThreadPoolMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.MessagingServiceMBean; +import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.CacheServiceMBean; import org.apache.cassandra.service.GCInspector; @@ -259,6 +260,11 @@ public class NodeProbe implements AutoCloseable return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, tableNames); } + public int garbageCollect(String tombstoneOption, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException + { + return ssProxy.garbageCollect(tombstoneOption, jobs, keyspaceName, tableNames); + } + private void checkJobs(PrintStream out, int jobs) { if (jobs > DatabaseDescriptor.getConcurrentCompactors()) @@ -301,7 +307,16 @@ public class NodeProbe implements AutoCloseable if (upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, tableNames) != 0) { failed = true; - out.println("Aborted upgrading sstables for atleast one table in keyspace "+keyspaceName+", check server logs for more information."); + out.println("Aborted upgrading sstables for at least one table in keyspace " + keyspaceName + ", check server logs for more information."); + } + } + + public void garbageCollect(PrintStream out, String tombstoneOption, int jobs, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException + { + if (garbageCollect(tombstoneOption, jobs, keyspaceName, tableNames) != 0) + { + failed = true; + out.println("Aborted garbage collection for at least one table in keyspace " + keyspaceName + ", check server logs for more information."); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 8640b58..cde4ee5 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -64,6 +64,7 @@ public class NodeTool Verify.class, Flush.class, UpgradeSSTable.class, + GarbageCollect.class, DisableAutoCompaction.class, EnableAutoCompaction.class, CompactionStats.class, http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java new file mode 100644 index 0000000..37daf09 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/GarbageCollect.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import io.airlift.command.Arguments; +import io.airlift.command.Command; +import io.airlift.command.Option; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool.NodeToolCmd; + +@Command(name = "garbagecollect", description = "Remove deleted data from one or more tables") +public class GarbageCollect extends NodeToolCmd +{ + @Arguments(usage = "[<keyspace> <tables>...]", description = "The keyspace followed by one or many tables") + private List<String> args = new ArrayList<>(); + + @Option(title = "granularity", + name = {"-g", "--granularity"}, + allowedValues = {"ROW", "CELL"}, + description = "Granularity of garbage removal. ROW (default) removes deleted partitions and rows, CELL also removes overwritten or deleted cells.") + private String tombstoneOption = "ROW"; + + @Option(title = "jobs", + name = {"-j", "--jobs"}, + description = "Number of sstables to cleanup simultanously, set to 0 to use all available compaction threads") + private int jobs = 2; + + @Override + public void execute(NodeProbe probe) + { + List<String> keyspaces = parseOptionalKeyspace(args, probe); + String[] tableNames = parseOptionalTables(args); + + for (String keyspace : keyspaces) + { + try + { + probe.garbageCollect(System.out, tombstoneOption, jobs, keyspace, tableNames); + } catch (Exception e) + { + throw new RuntimeException("Error occurred during garbage collection", e); + } + } + } +} \ No newline at end of file