Merge branch 'cassandra-3.0' into cassandra-3.X

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f33cd55a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f33cd55a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f33cd55a

Branch: refs/heads/trunk
Commit: f33cd55a5bbf9a8ba0073c606b971d3b3fc85471
Parents: 490c1c2 eb41380
Author: Branimir Lambov <branimir.lam...@datastax.com>
Authored: Fri Nov 18 12:43:04 2016 +0200
Committer: Branimir Lambov <branimir.lam...@datastax.com>
Committed: Fri Nov 18 12:44:09 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/cassandra/db/ReadCommand.java    |   5 +-
 .../db/compaction/CompactionController.java     |  46 +++++--
 .../db/compaction/CompactionIterator.java       |  22 +--
 .../db/compaction/CompactionManager.java        |   5 +-
 .../db/compaction/SSTableSplitter.java          |   5 +-
 .../cassandra/db/compaction/Upgrader.java       |   5 +-
 .../cassandra/db/compaction/Verifier.java       |   5 +-
 .../cassandra/db/partitions/PurgeFunction.java  |   6 +-
 .../db/compaction/CompactionControllerTest.java |  21 ++-
 .../db/compaction/CompactionsPurgeTest.java     | 138 ++++++++++++++++++-
 11 files changed, 213 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6ca26f9,8a3ac65..ee73b81
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -150,6 -37,7 +150,8 @@@ Merged from 3.0
   * Correct log message for statistics of offheap memtable flush 
(CASSANDRA-12776)
   * Explicitly set locale for string validation 
(CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
  Merged from 2.2:
++=======
+  * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
   * Fail repair if participant dies during sync or anticompaction 
(CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using 
prepared statements (CASSANDRA-12863)
   * Fix Util.spinAssertEquals (CASSANDRA-12283)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index b34eee6,34d093e..64c35d9
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -18,13 -18,10 +18,14 @@@
  package org.apache.cassandra.db.compaction;
  
  import java.util.*;
+ import java.util.function.Predicate;
  
  import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +
 +import com.google.common.base.Predicates;
  import com.google.common.collect.Iterables;
 +import com.google.common.util.concurrent.RateLimiter;
  
  import org.apache.cassandra.db.partitions.Partition;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
@@@ -213,20 -194,24 +214,24 @@@ public class CompactionController imple
      }
  
      /**
-      * @return the largest timestamp before which it's okay to drop 
tombstones for the given partition;
-      * i.e., after the maxPurgeableTimestamp there may exist newer data that 
still needs to be suppressed
-      * in other sstables.  This returns the minimum timestamp for any SSTable 
that contains this partition and is not
-      * participating in this compaction, or memtable that contains this 
partition,
-      * or LONG.MAX_VALUE if no SSTable or memtable exist.
+      * @param key
+      * @return a predicate for whether tombstones marked for deletion at the 
given time for the given partition are
+      * purgeable; we calculate this by checking whether the deletion time is 
less than the min timestamp of all SSTables
+      * containing his partition and not participating in the compaction. This 
means there isn't any data in those
+      * sstables that might still need to be suppressed by a tombstone at this 
timestamp.
       */
-     public long maxPurgeableTimestamp(DecoratedKey key)
+     public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
      {
 -        if (!compactingRepaired() || NEVER_PURGE_TOMBSTONES)
 +        if (NEVER_PURGE_TOMBSTONES || !compactingRepaired())
-             return Long.MIN_VALUE;
+             return time -> false;
  
-         long min = Long.MAX_VALUE;
          overlapIterator.update(key);
-         for (SSTableReader sstable : overlapIterator.overlaps())
+         Set<SSTableReader> filteredSSTables = overlapIterator.overlaps();
+         Iterable<Memtable> memtables = 
cfs.getTracker().getView().getAllMemtables();
+         long minTimestampSeen = Long.MAX_VALUE;
+         boolean hasTimestamp = false;
+ 
 -        for (SSTableReader sstable : filteredSSTables)
++        for (SSTableReader sstable: filteredSSTables)
          {
              // if we don't have bloom filter(bf_fp_chance=1.0 or filter file 
is missing),
              // we check index file instead.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index fd1393c,9f0984f..4693794
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@@ -17,9 -17,12 +17,10 @@@
   */
  package org.apache.cassandra.db.compaction;
  
 -import java.util.List;
 -import java.util.UUID;
 +import java.util.*;
+ import java.util.function.Predicate;
  
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 +import com.google.common.collect.Ordering;
  
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.db.*;
@@@ -299,251 -292,18 +299,251 @@@ public class CompactionIterator extend
          }
  
          /*
-          * Tombstones with a localDeletionTime before this can be purged. 
This is the minimum timestamp for any sstable
-          * containing `currentKey` outside of the set of sstables involved in 
this compaction. This is computed lazily
-          * on demand as we only need this if there is tombstones and this a 
bit expensive (see #8914).
+          * Evaluates whether a tombstone with the given deletion timestamp 
can be purged. This is the minimum
+          * timestamp for any sstable containing `currentKey` outside of the 
set of sstables involved in this compaction.
+          * This is computed lazily on demand as we only need this if there is 
tombstones and this a bit expensive
+          * (see #8914).
           */
-         protected long getMaxPurgeableTimestamp()
+         protected Predicate<Long> getPurgeEvaluator()
          {
-             if (!hasCalculatedMaxPurgeableTimestamp)
+             if (purgeEvaluator == null)
              {
-                 hasCalculatedMaxPurgeableTimestamp = true;
-                 maxPurgeableTimestamp = 
controller.maxPurgeableTimestamp(currentKey);
+                 purgeEvaluator = controller.getPurgeEvaluator(currentKey);
              }
-             return maxPurgeableTimestamp;
+             return purgeEvaluator;
          }
      }
 +
 +    /**
 +     * Unfiltered row iterator that removes deleted data as provided by a 
"tombstone source" for the partition.
 +     * The result produced by this iterator is such that when merged with 
tombSource it produces the same output
 +     * as the merge of dataSource and tombSource.
 +     */
 +    private static class GarbageSkippingUnfilteredRowIterator extends 
WrappingUnfilteredRowIterator
 +    {
 +        final UnfilteredRowIterator tombSource;
 +        final DeletionTime partitionLevelDeletion;
 +        final Row staticRow;
 +        final ColumnFilter cf;
 +        final int nowInSec;
 +        final CFMetaData metadata;
 +        final boolean cellLevelGC;
 +
 +        DeletionTime tombOpenDeletionTime = DeletionTime.LIVE;
 +        DeletionTime dataOpenDeletionTime = DeletionTime.LIVE;
 +        DeletionTime openDeletionTime = DeletionTime.LIVE;
 +        DeletionTime partitionDeletionTime;
 +        DeletionTime activeDeletionTime;
 +        Unfiltered tombNext = null;
 +        Unfiltered dataNext = null;
 +        Unfiltered next = null;
 +
 +        /**
 +         * Construct an iterator that filters out data shadowed by the 
provided "tombstone source".
 +         *
 +         * @param dataSource The input row. The result is a filtered version 
of this.
 +         * @param tombSource Tombstone source, i.e. iterator used to identify 
deleted data in the input row.
 +         * @param nowInSec Current time, used in choosing the winner when 
cell expiration is involved.
 +         * @param cellLevelGC If false, the iterator will only look at 
row-level deletion times and tombstones.
 +         *                    If true, deleted or overwritten cells within a 
surviving row will also be removed.
 +         */
 +        protected GarbageSkippingUnfilteredRowIterator(UnfilteredRowIterator 
dataSource, UnfilteredRowIterator tombSource, int nowInSec, boolean cellLevelGC)
 +        {
 +            super(dataSource);
 +            this.tombSource = tombSource;
 +            this.nowInSec = nowInSec;
 +            this.cellLevelGC = cellLevelGC;
 +            metadata = dataSource.metadata();
 +            cf = ColumnFilter.all(metadata);
 +
 +            activeDeletionTime = partitionDeletionTime = 
tombSource.partitionLevelDeletion();
 +
 +            // Only preserve partition level deletion if not shadowed. (Note: 
Shadowing deletion must not be copied.)
 +            this.partitionLevelDeletion = 
dataSource.partitionLevelDeletion().supersedes(tombSource.partitionLevelDeletion())
 ?
 +                    dataSource.partitionLevelDeletion() :
 +                    DeletionTime.LIVE;
 +
 +            Row dataStaticRow = garbageFilterRow(dataSource.staticRow(), 
tombSource.staticRow());
 +            this.staticRow = dataStaticRow != null ? dataStaticRow : 
Rows.EMPTY_STATIC_ROW;
 +
 +            tombNext = advance(tombSource);
 +            dataNext = advance(dataSource);
 +        }
 +
 +        private static Unfiltered advance(UnfilteredRowIterator source)
 +        {
 +            return source.hasNext() ? source.next() : null;
 +        }
 +
 +        @Override
 +        public DeletionTime partitionLevelDeletion()
 +        {
 +            return partitionLevelDeletion;
 +        }
 +
 +        public void close()
 +        {
 +            super.close();
 +            tombSource.close();
 +        }
 +
 +        @Override
 +        public Row staticRow()
 +        {
 +            return staticRow;
 +        }
 +
 +        @Override
 +        public boolean hasNext()
 +        {
 +            // Produce the next element. This may consume multiple elements 
from both inputs until we find something
 +            // from dataSource that is still live. We track the currently 
open deletion in both sources, as well as the
 +            // one we have last issued to the output. The 
tombOpenDeletionTime is used to filter out content; the others
 +            // to decide whether or not a tombstone is superseded, and to be 
able to surface (the rest of) a deletion
 +            // range from the input when a suppressing deletion ends.
 +            while (next == null && dataNext != null)
 +            {
 +                int cmp = tombNext == null ? -1 : 
metadata.comparator.compare(dataNext, tombNext);
 +                if (cmp < 0)
 +                {
 +                    if (dataNext.isRow())
 +                        next = ((Row) dataNext).filter(cf, 
activeDeletionTime, false, metadata);
 +                    else
 +                        next = processDataMarker();
 +                }
 +                else if (cmp == 0)
 +                {
 +                    if (dataNext.isRow())
 +                    {
 +                        next = garbageFilterRow((Row) dataNext, (Row) 
tombNext);
 +                    }
 +                    else
 +                    {
 +                        tombOpenDeletionTime = 
updateOpenDeletionTime(tombOpenDeletionTime, tombNext);
 +                        activeDeletionTime = 
Ordering.natural().max(partitionDeletionTime,
 +                                                                    
tombOpenDeletionTime);
 +                        next = processDataMarker();
 +                    }
 +                }
 +                else // (cmp > 0)
 +                {
 +                    if (tombNext.isRangeTombstoneMarker())
 +                    {
 +                        tombOpenDeletionTime = 
updateOpenDeletionTime(tombOpenDeletionTime, tombNext);
 +                        activeDeletionTime = 
Ordering.natural().max(partitionDeletionTime,
 +                                                                    
tombOpenDeletionTime);
 +                        boolean supersededBefore = openDeletionTime.isLive();
 +                        boolean supersededAfter = 
!dataOpenDeletionTime.supersedes(activeDeletionTime);
 +                        // If a range open was not issued because it was 
superseded and the deletion isn't superseded any more, we need to open it now.
 +                        if (supersededBefore && !supersededAfter)
 +                            next = new 
RangeTombstoneBoundMarker(((RangeTombstoneMarker) 
tombNext).closeBound(false).invert(), dataOpenDeletionTime);
 +                        // If the deletion begins to be superseded, we don't 
close the range yet. This can save us a close/open pair if it ends after the 
superseding range.
 +                    }
 +                }
 +
 +                if (next instanceof RangeTombstoneMarker)
 +                    openDeletionTime = 
updateOpenDeletionTime(openDeletionTime, next);
 +
 +                if (cmp <= 0)
 +                    dataNext = advance(wrapped);
 +                if (cmp >= 0)
 +                    tombNext = advance(tombSource);
 +            }
 +            return next != null;
 +        }
 +
 +        protected Row garbageFilterRow(Row dataRow, Row tombRow)
 +        {
 +            if (cellLevelGC)
 +            {
 +                return Rows.removeShadowedCells(dataRow, tombRow, 
activeDeletionTime, nowInSec);
 +            }
 +            else
 +            {
 +                DeletionTime deletion = 
Ordering.natural().max(tombRow.deletion().time(),
 +                                                               
activeDeletionTime);
 +                return dataRow.filter(cf, deletion, false, metadata);
 +            }
 +        }
 +
 +        /**
 +         * Decide how to act on a tombstone marker from the input iterator. 
We can decide what to issue depending on
 +         * whether or not the ranges before and after the marker are 
superseded/live -- if none are, we can reuse the
 +         * marker; if both are, the marker can be ignored; otherwise we issue 
a corresponding start/end marker.
 +         */
 +        private RangeTombstoneMarker processDataMarker()
 +        {
 +            dataOpenDeletionTime = 
updateOpenDeletionTime(dataOpenDeletionTime, dataNext);
 +            boolean supersededBefore = openDeletionTime.isLive();
 +            boolean supersededAfter = 
!dataOpenDeletionTime.supersedes(activeDeletionTime);
 +            RangeTombstoneMarker marker = (RangeTombstoneMarker) dataNext;
 +            if (!supersededBefore)
 +                if (!supersededAfter)
 +                    return marker;
 +                else
 +                    return new 
RangeTombstoneBoundMarker(marker.closeBound(false), 
marker.closeDeletionTime(false));
 +            else
 +                if (!supersededAfter)
 +                    return new 
RangeTombstoneBoundMarker(marker.openBound(false), 
marker.openDeletionTime(false));
 +                else
 +                    return null;
 +        }
 +
 +        @Override
 +        public Unfiltered next()
 +        {
 +            if (!hasNext())
 +                throw new IllegalStateException();
 +
 +            Unfiltered v = next;
 +            next = null;
 +            return v;
 +        }
 +
 +        private DeletionTime updateOpenDeletionTime(DeletionTime 
openDeletionTime, Unfiltered next)
 +        {
 +            RangeTombstoneMarker marker = (RangeTombstoneMarker) next;
 +            assert openDeletionTime.isLive() == !marker.isClose(false);
 +            assert openDeletionTime.isLive() || 
openDeletionTime.equals(marker.closeDeletionTime(false));
 +            return marker.isOpen(false) ? marker.openDeletionTime(false) : 
DeletionTime.LIVE;
 +        }
 +    }
 +
 +    /**
 +     * Partition transformation applying 
GarbageSkippingUnfilteredRowIterator, obtaining tombstone sources for each
 +     * partition using the controller's shadowSources method.
 +     */
 +    private static class GarbageSkipper extends 
Transformation<UnfilteredRowIterator>
 +    {
 +        final int nowInSec;
 +        final CompactionController controller;
 +        final boolean cellLevelGC;
 +
 +        private GarbageSkipper(CompactionController controller, int nowInSec)
 +        {
 +            this.controller = controller;
 +            this.nowInSec = nowInSec;
 +            cellLevelGC = controller.tombstoneOption == TombstoneOption.CELL;
 +        }
 +
 +        @Override
 +        protected UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator partition)
 +        {
 +            Iterable<UnfilteredRowIterator> sources = 
controller.shadowSources(partition.partitionKey(), !cellLevelGC);
 +            if (sources == null)
 +                return partition;
 +            List<UnfilteredRowIterator> iters = new ArrayList<>();
 +            for (UnfilteredRowIterator iter : sources)
 +            {
 +                if (!iter.isEmpty())
 +                    iters.add(iter);
 +                else
 +                    iter.close();
 +            }
 +            if (iters.isEmpty())
 +                return partition;
 +
 +            return new GarbageSkippingUnfilteredRowIterator(partition, 
UnfilteredRowIterators.merge(iters, nowInSec), nowInSec, cellLevelGC);
 +        }
 +    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a0dc8c9,a77cefb..bc72fd8
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -22,7 -22,7 +22,8 @@@ import java.io.IOException
  import java.lang.management.ManagementFactory;
  import java.util.*;
  import java.util.concurrent.*;
+ import java.util.function.Predicate;
 +import java.util.stream.Collectors;
  import javax.management.MBeanServer;
  import javax.management.ObjectName;
  import javax.management.openmbean.OpenDataException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------

Reply via email to