Merge branch 'cassandra-3.0' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f33cd55a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f33cd55a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f33cd55a Branch: refs/heads/trunk Commit: f33cd55a5bbf9a8ba0073c606b971d3b3fc85471 Parents: 490c1c2 eb41380 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Fri Nov 18 12:43:04 2016 +0200 Committer: Branimir Lambov <branimir.lam...@datastax.com> Committed: Fri Nov 18 12:44:09 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/cassandra/db/ReadCommand.java | 5 +- .../db/compaction/CompactionController.java | 46 +++++-- .../db/compaction/CompactionIterator.java | 22 +-- .../db/compaction/CompactionManager.java | 5 +- .../db/compaction/SSTableSplitter.java | 5 +- .../cassandra/db/compaction/Upgrader.java | 5 +- .../cassandra/db/compaction/Verifier.java | 5 +- .../cassandra/db/partitions/PurgeFunction.java | 6 +- .../db/compaction/CompactionControllerTest.java | 21 ++- .../db/compaction/CompactionsPurgeTest.java | 138 ++++++++++++++++++- 11 files changed, 213 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 6ca26f9,8a3ac65..ee73b81 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -150,6 -37,7 +150,8 @@@ Merged from 3.0 * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776) * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545) Merged from 2.2: ++======= + * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792) * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901) * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863) * Fix Util.spinAssertEquals (CASSANDRA-12283) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java index b34eee6,34d093e..64c35d9 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@@ -18,13 -18,10 +18,14 @@@ package org.apache.cassandra.db.compaction; import java.util.*; + import java.util.function.Predicate; import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +import com.google.common.base.Predicates; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.io.sstable.format.SSTableReader; @@@ -213,20 -194,24 +214,24 @@@ public class CompactionController imple } /** - * @return the largest timestamp before which it's okay to drop tombstones for the given partition; - * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed - * in other sstables. This returns the minimum timestamp for any SSTable that contains this partition and is not - * participating in this compaction, or memtable that contains this partition, - * or LONG.MAX_VALUE if no SSTable or memtable exist. + * @param key + * @return a predicate for whether tombstones marked for deletion at the given time for the given partition are + * purgeable; we calculate this by checking whether the deletion time is less than the min timestamp of all SSTables + * containing his partition and not participating in the compaction. This means there isn't any data in those + * sstables that might still need to be suppressed by a tombstone at this timestamp. */ - public long maxPurgeableTimestamp(DecoratedKey key) + public Predicate<Long> getPurgeEvaluator(DecoratedKey key) { - if (!compactingRepaired() || NEVER_PURGE_TOMBSTONES) + if (NEVER_PURGE_TOMBSTONES || !compactingRepaired()) - return Long.MIN_VALUE; + return time -> false; - long min = Long.MAX_VALUE; overlapIterator.update(key); - for (SSTableReader sstable : overlapIterator.overlaps()) + Set<SSTableReader> filteredSSTables = overlapIterator.overlaps(); + Iterable<Memtable> memtables = cfs.getTracker().getView().getAllMemtables(); + long minTimestampSeen = Long.MAX_VALUE; + boolean hasTimestamp = false; + - for (SSTableReader sstable : filteredSSTables) ++ for (SSTableReader sstable: filteredSSTables) { // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing), // we check index file instead. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index fd1393c,9f0984f..4693794 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@@ -17,9 -17,12 +17,10 @@@ */ package org.apache.cassandra.db.compaction; -import java.util.List; -import java.util.UUID; +import java.util.*; + import java.util.function.Predicate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.collect.Ordering; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; @@@ -299,251 -292,18 +299,251 @@@ public class CompactionIterator extend } /* - * Tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable - * containing `currentKey` outside of the set of sstables involved in this compaction. This is computed lazily - * on demand as we only need this if there is tombstones and this a bit expensive (see #8914). + * Evaluates whether a tombstone with the given deletion timestamp can be purged. This is the minimum + * timestamp for any sstable containing `currentKey` outside of the set of sstables involved in this compaction. + * This is computed lazily on demand as we only need this if there is tombstones and this a bit expensive + * (see #8914). */ - protected long getMaxPurgeableTimestamp() + protected Predicate<Long> getPurgeEvaluator() { - if (!hasCalculatedMaxPurgeableTimestamp) + if (purgeEvaluator == null) { - hasCalculatedMaxPurgeableTimestamp = true; - maxPurgeableTimestamp = controller.maxPurgeableTimestamp(currentKey); + purgeEvaluator = controller.getPurgeEvaluator(currentKey); } - return maxPurgeableTimestamp; + return purgeEvaluator; } } + + /** + * Unfiltered row iterator that removes deleted data as provided by a "tombstone source" for the partition. + * The result produced by this iterator is such that when merged with tombSource it produces the same output + * as the merge of dataSource and tombSource. + */ + private static class GarbageSkippingUnfilteredRowIterator extends WrappingUnfilteredRowIterator + { + final UnfilteredRowIterator tombSource; + final DeletionTime partitionLevelDeletion; + final Row staticRow; + final ColumnFilter cf; + final int nowInSec; + final CFMetaData metadata; + final boolean cellLevelGC; + + DeletionTime tombOpenDeletionTime = DeletionTime.LIVE; + DeletionTime dataOpenDeletionTime = DeletionTime.LIVE; + DeletionTime openDeletionTime = DeletionTime.LIVE; + DeletionTime partitionDeletionTime; + DeletionTime activeDeletionTime; + Unfiltered tombNext = null; + Unfiltered dataNext = null; + Unfiltered next = null; + + /** + * Construct an iterator that filters out data shadowed by the provided "tombstone source". + * + * @param dataSource The input row. The result is a filtered version of this. + * @param tombSource Tombstone source, i.e. iterator used to identify deleted data in the input row. + * @param nowInSec Current time, used in choosing the winner when cell expiration is involved. + * @param cellLevelGC If false, the iterator will only look at row-level deletion times and tombstones. + * If true, deleted or overwritten cells within a surviving row will also be removed. + */ + protected GarbageSkippingUnfilteredRowIterator(UnfilteredRowIterator dataSource, UnfilteredRowIterator tombSource, int nowInSec, boolean cellLevelGC) + { + super(dataSource); + this.tombSource = tombSource; + this.nowInSec = nowInSec; + this.cellLevelGC = cellLevelGC; + metadata = dataSource.metadata(); + cf = ColumnFilter.all(metadata); + + activeDeletionTime = partitionDeletionTime = tombSource.partitionLevelDeletion(); + + // Only preserve partition level deletion if not shadowed. (Note: Shadowing deletion must not be copied.) + this.partitionLevelDeletion = dataSource.partitionLevelDeletion().supersedes(tombSource.partitionLevelDeletion()) ? + dataSource.partitionLevelDeletion() : + DeletionTime.LIVE; + + Row dataStaticRow = garbageFilterRow(dataSource.staticRow(), tombSource.staticRow()); + this.staticRow = dataStaticRow != null ? dataStaticRow : Rows.EMPTY_STATIC_ROW; + + tombNext = advance(tombSource); + dataNext = advance(dataSource); + } + + private static Unfiltered advance(UnfilteredRowIterator source) + { + return source.hasNext() ? source.next() : null; + } + + @Override + public DeletionTime partitionLevelDeletion() + { + return partitionLevelDeletion; + } + + public void close() + { + super.close(); + tombSource.close(); + } + + @Override + public Row staticRow() + { + return staticRow; + } + + @Override + public boolean hasNext() + { + // Produce the next element. This may consume multiple elements from both inputs until we find something + // from dataSource that is still live. We track the currently open deletion in both sources, as well as the + // one we have last issued to the output. The tombOpenDeletionTime is used to filter out content; the others + // to decide whether or not a tombstone is superseded, and to be able to surface (the rest of) a deletion + // range from the input when a suppressing deletion ends. + while (next == null && dataNext != null) + { + int cmp = tombNext == null ? -1 : metadata.comparator.compare(dataNext, tombNext); + if (cmp < 0) + { + if (dataNext.isRow()) + next = ((Row) dataNext).filter(cf, activeDeletionTime, false, metadata); + else + next = processDataMarker(); + } + else if (cmp == 0) + { + if (dataNext.isRow()) + { + next = garbageFilterRow((Row) dataNext, (Row) tombNext); + } + else + { + tombOpenDeletionTime = updateOpenDeletionTime(tombOpenDeletionTime, tombNext); + activeDeletionTime = Ordering.natural().max(partitionDeletionTime, + tombOpenDeletionTime); + next = processDataMarker(); + } + } + else // (cmp > 0) + { + if (tombNext.isRangeTombstoneMarker()) + { + tombOpenDeletionTime = updateOpenDeletionTime(tombOpenDeletionTime, tombNext); + activeDeletionTime = Ordering.natural().max(partitionDeletionTime, + tombOpenDeletionTime); + boolean supersededBefore = openDeletionTime.isLive(); + boolean supersededAfter = !dataOpenDeletionTime.supersedes(activeDeletionTime); + // If a range open was not issued because it was superseded and the deletion isn't superseded any more, we need to open it now. + if (supersededBefore && !supersededAfter) + next = new RangeTombstoneBoundMarker(((RangeTombstoneMarker) tombNext).closeBound(false).invert(), dataOpenDeletionTime); + // If the deletion begins to be superseded, we don't close the range yet. This can save us a close/open pair if it ends after the superseding range. + } + } + + if (next instanceof RangeTombstoneMarker) + openDeletionTime = updateOpenDeletionTime(openDeletionTime, next); + + if (cmp <= 0) + dataNext = advance(wrapped); + if (cmp >= 0) + tombNext = advance(tombSource); + } + return next != null; + } + + protected Row garbageFilterRow(Row dataRow, Row tombRow) + { + if (cellLevelGC) + { + return Rows.removeShadowedCells(dataRow, tombRow, activeDeletionTime, nowInSec); + } + else + { + DeletionTime deletion = Ordering.natural().max(tombRow.deletion().time(), + activeDeletionTime); + return dataRow.filter(cf, deletion, false, metadata); + } + } + + /** + * Decide how to act on a tombstone marker from the input iterator. We can decide what to issue depending on + * whether or not the ranges before and after the marker are superseded/live -- if none are, we can reuse the + * marker; if both are, the marker can be ignored; otherwise we issue a corresponding start/end marker. + */ + private RangeTombstoneMarker processDataMarker() + { + dataOpenDeletionTime = updateOpenDeletionTime(dataOpenDeletionTime, dataNext); + boolean supersededBefore = openDeletionTime.isLive(); + boolean supersededAfter = !dataOpenDeletionTime.supersedes(activeDeletionTime); + RangeTombstoneMarker marker = (RangeTombstoneMarker) dataNext; + if (!supersededBefore) + if (!supersededAfter) + return marker; + else + return new RangeTombstoneBoundMarker(marker.closeBound(false), marker.closeDeletionTime(false)); + else + if (!supersededAfter) + return new RangeTombstoneBoundMarker(marker.openBound(false), marker.openDeletionTime(false)); + else + return null; + } + + @Override + public Unfiltered next() + { + if (!hasNext()) + throw new IllegalStateException(); + + Unfiltered v = next; + next = null; + return v; + } + + private DeletionTime updateOpenDeletionTime(DeletionTime openDeletionTime, Unfiltered next) + { + RangeTombstoneMarker marker = (RangeTombstoneMarker) next; + assert openDeletionTime.isLive() == !marker.isClose(false); + assert openDeletionTime.isLive() || openDeletionTime.equals(marker.closeDeletionTime(false)); + return marker.isOpen(false) ? marker.openDeletionTime(false) : DeletionTime.LIVE; + } + } + + /** + * Partition transformation applying GarbageSkippingUnfilteredRowIterator, obtaining tombstone sources for each + * partition using the controller's shadowSources method. + */ + private static class GarbageSkipper extends Transformation<UnfilteredRowIterator> + { + final int nowInSec; + final CompactionController controller; + final boolean cellLevelGC; + + private GarbageSkipper(CompactionController controller, int nowInSec) + { + this.controller = controller; + this.nowInSec = nowInSec; + cellLevelGC = controller.tombstoneOption == TombstoneOption.CELL; + } + + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + Iterable<UnfilteredRowIterator> sources = controller.shadowSources(partition.partitionKey(), !cellLevelGC); + if (sources == null) + return partition; + List<UnfilteredRowIterator> iters = new ArrayList<>(); + for (UnfilteredRowIterator iter : sources) + { + if (!iter.isEmpty()) + iters.add(iter); + else + iter.close(); + } + if (iters.isEmpty()) + return partition; + + return new GarbageSkippingUnfilteredRowIterator(partition, UnfilteredRowIterators.merge(iters, nowInSec), nowInSec, cellLevelGC); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index a0dc8c9,a77cefb..bc72fd8 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -22,7 -22,7 +22,8 @@@ import java.io.IOException import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.*; + import java.util.function.Predicate; +import java.util.stream.Collectors; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.OpenDataException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java ----------------------------------------------------------------------