Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 51e6f2446 -> 68bdf4547
  refs/heads/cassandra-3.11 0e5c84ad8 -> 3e3d56ecd
  refs/heads/trunk 756fab87c -> 00022bfa6


Handle limit correctly on tables with strict liveness

Patch by Zhao Yang; Reviewed by Paulo Motta for CASSANDRA-13883


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/68bdf454
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/68bdf454
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/68bdf454

Branch: refs/heads/cassandra-3.0
Commit: 68bdf45477417c97fa6ed3840eee39b8390fd678
Parents: 51e6f24
Author: Zhao Yang <zhaoyangsingap...@gmail.com>
Authored: Tue Sep 19 18:35:24 2017 +0800
Committer: Paulo Motta <pa...@apache.org>
Committed: Mon Sep 25 01:00:45 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  5 +-
 .../org/apache/cassandra/db/ReadCommand.java    |  3 +-
 .../db/SinglePartitionReadCommand.java          |  9 ++-
 .../apache/cassandra/db/filter/DataLimits.java  | 81 ++++++++++++++------
 .../db/partitions/CachedBTreePartition.java     |  3 +-
 .../apache/cassandra/db/rows/AbstractRow.java   |  6 +-
 src/java/org/apache/cassandra/db/rows/Row.java  |  7 +-
 .../cassandra/db/view/ViewUpdateGenerator.java  |  6 +-
 .../composites/ClusteringColumnIndex.java       |  5 +-
 .../internal/composites/PartitionKeyIndex.java  |  4 +-
 .../apache/cassandra/service/DataResolver.java  |  7 +-
 .../apache/cassandra/service/StorageProxy.java  | 12 ++-
 .../service/pager/AbstractQueryPager.java       |  4 +-
 .../service/pager/MultiPartitionPager.java      | 12 ++-
 .../cassandra/service/pager/QueryPagers.java    |  2 +-
 .../apache/cassandra/cql3/ViewComplexTest.java  | 65 +++++++++++++++-
 .../org/apache/cassandra/cql3/ViewTest.java     |  2 +
 .../apache/cassandra/db/RangeTombstoneTest.java | 34 +++++---
 .../db/compaction/CompactionsPurgeTest.java     |  3 +-
 20 files changed, 213 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 68da81a..9cba02b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Handle limit correctly on tables with strict liveness (CASSANDRA-13883)
  * Fix missing original update in TriggerExecutor (CASSANDRA-13894)
  * Remove non-rpc-ready nodes from counter leader candidates (CASSANDRA-13043)
  * Improve short read protection performance (CASSANDRA-13794)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index e6e46b2..0d19856 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1495,7 +1495,10 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         // is not in the cache. We can guarantee that if either the filter is 
a "head filter" and the cached
         // partition has more live rows that queried (where live rows refers 
to the rows that are live now),
         // or if we can prove that everything the filter selects is in the 
cached partition based on its content.
-        return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached, 
nowInSec, filter.selectsAllPartition()))
+        return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached,
+                                                                  nowInSec,
+                                                                  
filter.selectsAllPartition(),
+                                                                  
metadata.enforceStrictLiveness()))
                 || filter.isFullyCoveredBy(cached);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index b73cdde..160b104 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -450,6 +450,7 @@ public abstract class ReadCommand implements ReadQuery
             private final int warningThreshold = 
DatabaseDescriptor.getTombstoneWarnThreshold();
 
             private final boolean respectTombstoneThresholds = 
!Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName);
+            private final boolean enforceStrictLiveness = 
metadata.enforceStrictLiveness();
 
             private int liveRows = 0;
             private int tombstones = 0;
@@ -472,7 +473,7 @@ public abstract class ReadCommand implements ReadQuery
             @Override
             public Row applyToRow(Row row)
             {
-                if (row.hasLiveData(ReadCommand.this.nowInSec()))
+                if (row.hasLiveData(ReadCommand.this.nowInSec(), 
enforceStrictLiveness))
                     ++liveRows;
 
                 for (Cell cell : row.cells())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 00464ca..7a66eca 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -556,6 +556,7 @@ public class SinglePartitionReadCommand extends ReadCommand
             try
             {
                 final int rowsToCache = 
metadata().params.caching.rowsPerPartitionToCache();
+                final boolean enforceStrictLiveness = 
metadata().enforceStrictLiveness();
 
                 @SuppressWarnings("resource") // we close on exception or upon 
closing the result of this method
                 UnfilteredRowIterator iter = fullPartitionRead(metadata(), 
nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
@@ -579,7 +580,7 @@ public class SinglePartitionReadCommand extends ReadCommand
                             if (unfiltered.isRow())
                             {
                                 Row row = (Row) unfiltered;
-                                if (row.hasLiveData(nowInSec()))
+                                if (row.hasLiveData(nowInSec(), 
enforceStrictLiveness))
                                     rowsCounted++;
                             }
                             return unfiltered;
@@ -1161,10 +1162,14 @@ public class SinglePartitionReadCommand extends 
ReadCommand
             for (SinglePartitionReadCommand cmd : commands)
                 partitions.add(cmd.executeInternal(orderGroup));
 
+            // Note that the only difference between the command in a group 
must be the partition key on which
+            // they applied.
+            boolean enforceStrictLiveness = 
commands.get(0).metadata().enforceStrictLiveness();
             // Because we only have enforce the limit per command, we need to 
enforce it globally.
             return limits.filter(PartitionIterators.concat(partitions),
                                  nowInSec,
-                                 selectsFullPartitions);
+                                 selectsFullPartitions,
+                                 enforceStrictLiveness);
         }
 
         public QueryPager getPager(PagingState pagingState, int 
protocolVersion)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java 
b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 48ec06a..6b74293 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.filter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
@@ -46,7 +47,7 @@ public abstract class DataLimits
     public static final DataLimits NONE = new CQLLimits(NO_LIMIT)
     {
         @Override
-        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, 
boolean countPartitionsWithOnlyStaticData)
+        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, 
boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
         {
             return false;
         }
@@ -109,7 +110,10 @@ public abstract class DataLimits
 
     public abstract DataLimits forShortReadRetry(int toFetch);
 
-    public abstract boolean hasEnoughLiveData(CachedPartition cached, int 
nowInSec, boolean countPartitionsWithOnlyStaticData);
+    public abstract boolean hasEnoughLiveData(CachedPartition cached,
+                                              int nowInSec,
+                                              boolean 
countPartitionsWithOnlyStaticData,
+                                              boolean enforceStrictLiveness);
 
     /**
      * Returns a new {@code Counter} for this limits.
@@ -120,9 +124,14 @@ public abstract class DataLimits
      * {@code RowIterator} (since it only returns live rows), false otherwise.
      * @param countPartitionsWithOnlyStaticData if {@code true} the partitions 
with only static data should be counted
      * as 1 valid row.
+     * @param enforceStrictLiveness whether the row should be purged if there 
is no PK liveness info,
+     *                              normally retrieved from {@link 
CFMetaData#enforceStrictLiveness()}
      * @return a new {@code Counter} for this limits.
      */
-    public abstract Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData);
+    public abstract Counter newCounter(int nowInSec,
+                                       boolean assumeLiveData,
+                                       boolean 
countPartitionsWithOnlyStaticData,
+                                       boolean enforceStrictLiveness);
 
     /**
      * The max number of results this limits enforces.
@@ -140,19 +149,27 @@ public abstract class DataLimits
                                               int nowInSec,
                                               boolean 
countPartitionsWithOnlyStaticData)
     {
-        return this.newCounter(nowInSec, false, 
countPartitionsWithOnlyStaticData).applyTo(iter);
+        return this.newCounter(nowInSec,
+                               false,
+                               countPartitionsWithOnlyStaticData,
+                               iter.metadata().enforceStrictLiveness())
+                   .applyTo(iter);
     }
 
     public UnfilteredRowIterator filter(UnfilteredRowIterator iter,
                                         int nowInSec,
                                         boolean 
countPartitionsWithOnlyStaticData)
     {
-        return this.newCounter(nowInSec, false, 
countPartitionsWithOnlyStaticData).applyTo(iter);
+        return this.newCounter(nowInSec,
+                               false,
+                               countPartitionsWithOnlyStaticData,
+                               iter.metadata().enforceStrictLiveness())
+                   .applyTo(iter);
     }
 
-    public PartitionIterator filter(PartitionIterator iter, int nowInSec, 
boolean countPartitionsWithOnlyStaticData)
+    public PartitionIterator filter(PartitionIterator iter, int nowInSec, 
boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
     {
-        return this.newCounter(nowInSec, true, 
countPartitionsWithOnlyStaticData).applyTo(iter);
+        return this.newCounter(nowInSec, true, 
countPartitionsWithOnlyStaticData, enforceStrictLiveness).applyTo(iter);
     }
 
     /**
@@ -301,7 +318,7 @@ public abstract class DataLimits
             return new CQLLimits(toFetch, NO_LIMIT, isDistinct);
         }
 
-        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, 
boolean countPartitionsWithOnlyStaticData)
+        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, 
boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
         {
             // We want the number of row that are currently live. Getting that 
precise number forces
             // us to iterate the cached partition in general, but we can avoid 
that if:
@@ -316,7 +333,7 @@ public abstract class DataLimits
 
             // Otherwise, we need to re-count
 
-            DataLimits.Counter counter = newCounter(nowInSec, false, 
countPartitionsWithOnlyStaticData);
+            DataLimits.Counter counter = newCounter(nowInSec, false, 
countPartitionsWithOnlyStaticData, enforceStrictLiveness);
             try (UnfilteredRowIterator cacheIter = 
cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, 
false);
                  UnfilteredRowIterator iter = counter.applyTo(cacheIter))
             {
@@ -327,9 +344,12 @@ public abstract class DataLimits
             }
         }
 
-        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
+        public Counter newCounter(int nowInSec,
+                                  boolean assumeLiveData,
+                                  boolean countPartitionsWithOnlyStaticData,
+                                  boolean enforceStrictLiveness)
         {
-            return new CQLCounter(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData);
+            return new CQLCounter(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData, enforceStrictLiveness);
         }
 
         public int count()
@@ -360,25 +380,30 @@ public abstract class DataLimits
             protected int rowInCurrentPartition;
 
             protected boolean hasLiveStaticRow;
+            private final boolean enforceStrictLiveness;
 
-            public CQLCounter(int nowInSec, boolean assumeLiveData, boolean 
countPartitionsWithOnlyStaticData)
+            public CQLCounter(int nowInSec,
+                              boolean assumeLiveData,
+                              boolean countPartitionsWithOnlyStaticData,
+                              boolean enforceStrictLiveness)
             {
                 this.nowInSec = nowInSec;
                 this.assumeLiveData = assumeLiveData;
                 this.countPartitionsWithOnlyStaticData = 
countPartitionsWithOnlyStaticData;
+                this.enforceStrictLiveness = enforceStrictLiveness;
             }
 
             @Override
             public void applyToPartition(DecoratedKey partitionKey, Row 
staticRow)
             {
                 rowInCurrentPartition = 0;
-                hasLiveStaticRow = !staticRow.isEmpty() && (assumeLiveData || 
staticRow.hasLiveData(nowInSec));
+                hasLiveStaticRow = !staticRow.isEmpty() && (assumeLiveData || 
staticRow.hasLiveData(nowInSec, enforceStrictLiveness));
             }
 
             @Override
             public Row applyToRow(Row row)
             {
-                if (assumeLiveData || row.hasLiveData(nowInSec))
+                if (assumeLiveData || row.hasLiveData(nowInSec, 
enforceStrictLiveness))
                     incrementRowCount();
                 return row;
             }
@@ -473,16 +498,19 @@ public abstract class DataLimits
         }
 
         @Override
-        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
+        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
         {
-            return new PagingAwareCounter(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData);
+            return new PagingAwareCounter(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData, enforceStrictLiveness);
         }
 
         private class PagingAwareCounter extends CQLCounter
         {
-            private PagingAwareCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
+            private PagingAwareCounter(int nowInSec,
+                                       boolean assumeLiveData,
+                                       boolean 
countPartitionsWithOnlyStaticData,
+                                       boolean enforceStrictLiveness)
             {
-                super(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData);
+                super(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData, enforceStrictLiveness);
             }
 
             @Override
@@ -556,7 +584,7 @@ public abstract class DataLimits
             return new ThriftLimits(1, toFetch);
         }
 
-        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, 
boolean countPartitionsWithOnlyStaticData)
+        public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, 
boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
         {
             // We want the number of cells that are currently live. Getting 
that precise number forces
             // us to iterate the cached partition in general, but we can avoid 
that if:
@@ -570,7 +598,7 @@ public abstract class DataLimits
                 return false;
 
             // Otherwise, we need to re-count
-            DataLimits.Counter counter = newCounter(nowInSec, false, 
countPartitionsWithOnlyStaticData);
+            DataLimits.Counter counter = newCounter(nowInSec, false, 
countPartitionsWithOnlyStaticData, enforceStrictLiveness);
             try (UnfilteredRowIterator cacheIter = 
cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, 
false);
                  UnfilteredRowIterator iter = counter.applyTo(cacheIter))
             {
@@ -581,7 +609,7 @@ public abstract class DataLimits
             }
         }
 
-        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
+        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
         {
             return new ThriftCounter(nowInSec, assumeLiveData);
         }
@@ -711,23 +739,26 @@ public abstract class DataLimits
         }
 
         @Override
-        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
+        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
         {
-            return new SuperColumnCountingCounter(nowInSec, assumeLiveData);
+            return new SuperColumnCountingCounter(nowInSec, assumeLiveData, 
enforceStrictLiveness);
         }
 
         protected class SuperColumnCountingCounter extends ThriftCounter
         {
-            public SuperColumnCountingCounter(int nowInSec, boolean 
assumeLiveData)
+            private final boolean enforceStrictLiveness;
+
+            public SuperColumnCountingCounter(int nowInSec, boolean 
assumeLiveData, boolean enforceStrictLiveness)
             {
                 super(nowInSec, assumeLiveData);
+                this.enforceStrictLiveness = enforceStrictLiveness;
             }
 
             @Override
             public Row applyToRow(Row row)
             {
                 // In the internal format, a row == a super column, so that's 
what we want to count.
-                if (assumeLiveData || row.hasLiveData(nowInSec))
+                if (assumeLiveData || row.hasLiveData(nowInSec, 
enforceStrictLiveness))
                 {
                     ++cellsCounted;
                     if (++cellsInCurrentPartition >= cellPerPartitionLimit)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
index afe1cc3..9c6ab59 100644
--- a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
@@ -91,10 +91,11 @@ public class CachedBTreePartition extends 
ImmutableBTreePartition implements Cac
         int rowsWithNonExpiringCells = 0;
         int nonTombstoneCellCount = 0;
         int nonExpiringLiveCells = 0;
+        boolean enforceStrictLiveness = 
iterator.metadata().enforceStrictLiveness();
 
         for (Row row : BTree.<Row>iterable(holder.tree))
         {
-            if (row.hasLiveData(nowInSec))
+            if (row.hasLiveData(nowInSec, enforceStrictLiveness))
                 ++cachedLiveRows;
 
             int nonExpiringLiveCellsThisRow = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java 
b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 59addeb..67ed219 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -42,11 +42,13 @@ public abstract class AbstractRow extends 
AbstractCollection<ColumnData> impleme
         return Unfiltered.Kind.ROW;
     }
 
-    public boolean hasLiveData(int nowInSec)
+    @Override
+    public boolean hasLiveData(int nowInSec, boolean enforceStrictLiveness)
     {
         if (primaryKeyLivenessInfo().isLive(nowInSec))
             return true;
-
+        else if (enforceStrictLiveness)
+            return false;
         return Iterables.any(cells(), cell -> cell.isLive(nowInSec));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java 
b/src/java/org/apache/cassandra/db/rows/Row.java
index 3bcc220..3c97e09 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -103,8 +103,13 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
 
     /**
      * Whether the row has some live information (i.e. it's not just deletion 
informations).
+     * 
+     * @param nowInSec the current time to decide what is deleted and what 
isn't
+     * @param enforceStrictLiveness whether the row should be purged if there 
is no PK liveness info,
+     *                              normally retrieved from {@link 
CFMetaData#enforceStrictLiveness()}
+     * @return true if there is some live information
      */
-    public boolean hasLiveData(int nowInSec);
+    public boolean hasLiveData(int nowInSec, boolean enforceStrictLiveness);
 
     /**
      * Returns a cell for a simple column.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java 
b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
index 0c8e078..341c511 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
@@ -51,6 +51,7 @@ public class ViewUpdateGenerator
     private final ByteBuffer[] basePartitionKey;
 
     private final CFMetaData viewMetadata;
+    private final boolean baseEnforceStrictLiveness;
 
     private final Map<DecoratedKey, PartitionUpdate> updates = new HashMap<>();
 
@@ -87,6 +88,7 @@ public class ViewUpdateGenerator
         this.nowInSec = nowInSec;
 
         this.baseMetadata = view.getDefinition().baseTableMetadata();
+        this.baseEnforceStrictLiveness = baseMetadata.enforceStrictLiveness();
         this.baseDecoratedKey = basePartitionKey;
         this.basePartitionKey = extractKeyComponents(basePartitionKey, 
baseMetadata.getKeyValidator());
 
@@ -186,8 +188,8 @@ public class ViewUpdateGenerator
             // The view entry is necessarily the same pre and post update.
 
             // Note that we allow existingBaseRow to be null and treat it as 
empty (see MultiViewUpdateBuilder.generateViewsMutations).
-            boolean existingHasLiveData = existingBaseRow != null && 
existingBaseRow.hasLiveData(nowInSec);
-            boolean mergedHasLiveData = mergedBaseRow.hasLiveData(nowInSec);
+            boolean existingHasLiveData = existingBaseRow != null && 
existingBaseRow.hasLiveData(nowInSec, baseEnforceStrictLiveness);
+            boolean mergedHasLiveData = mergedBaseRow.hasLiveData(nowInSec, 
baseEnforceStrictLiveness);
             return existingHasLiveData
                  ? (mergedHasLiveData ? UpdateAction.UPDATE_EXISTING : 
UpdateAction.DELETE_OLD)
                  : (mergedHasLiveData ? UpdateAction.NEW_ENTRY : 
UpdateAction.NONE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
 
b/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
index b932602..cace6de 100644
--- 
a/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
+++ 
b/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
@@ -45,9 +45,12 @@ import org.apache.cassandra.schema.IndexMetadata;
  */
 public class ClusteringColumnIndex extends CassandraIndex
 {
+    private final boolean enforceStrictLiveness;
+
     public ClusteringColumnIndex(ColumnFamilyStore baseCfs, IndexMetadata 
indexDef)
     {
         super(baseCfs, indexDef);
+        this.enforceStrictLiveness = baseCfs.metadata.enforceStrictLiveness();
     }
 
 
@@ -95,6 +98,6 @@ public class ClusteringColumnIndex extends CassandraIndex
 
     public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
     {
-        return !data.hasLiveData(nowInSec);
+        return !data.hasLiveData(nowInSec, enforceStrictLiveness);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
 
b/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
index 2c0b5aa..d854102 100644
--- 
a/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
+++ 
b/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
@@ -47,9 +47,11 @@ import org.apache.cassandra.schema.IndexMetadata;
  */
 public class PartitionKeyIndex extends CassandraIndex
 {
+    private final boolean enforceStrictLiveness;
     public PartitionKeyIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
     {
         super(baseCfs, indexDef);
+        this.enforceStrictLiveness = baseCfs.metadata.enforceStrictLiveness();
     }
 
     public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
@@ -90,6 +92,6 @@ public class PartitionKeyIndex extends CassandraIndex
 
     public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
     {
-        return !data.hasLiveData(nowInSec);
+        return !data.hasLiveData(nowInSec, enforceStrictLiveness);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
index f1eedd1..61bffe5 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -44,9 +44,12 @@ public class DataResolver extends ResponseResolver
     @VisibleForTesting
     final List<AsyncOneResponse> repairResults = 
Collections.synchronizedList(new ArrayList<>());
 
+    private final boolean enforceStrictLiveness;
+
     DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel 
consistency, int maxResponseCount)
     {
         super(keyspace, command, consistency, maxResponseCount);
+        this.enforceStrictLiveness = 
command.metadata().enforceStrictLiveness();
     }
 
     public PartitionIterator getData()
@@ -98,7 +101,7 @@ public class DataResolver extends ResponseResolver
          */
 
         DataLimits.Counter mergedResultCounter =
-            command.limits().newCounter(command.nowInSec(), true, 
command.selectsFullPartition());
+            command.limits().newCounter(command.nowInSec(), true, 
command.selectsFullPartition(), enforceStrictLiveness);
 
         UnfilteredPartitionIterator merged = 
mergeWithShortReadProtection(iters, sources, mergedResultCounter);
         FilteredPartitions filtered =
@@ -125,7 +128,7 @@ public class DataResolver extends ResponseResolver
             for (int i = 0; i < results.size(); i++)
             {
                 DataLimits.Counter singleResultCounter =
-                    command.limits().newCounter(command.nowInSec(), false, 
command.selectsFullPartition()).onlyCount();
+                    command.limits().newCounter(command.nowInSec(), false, 
command.selectsFullPartition(), enforceStrictLiveness).onlyCount();
 
                 ShortReadResponseProtection protection =
                     new ShortReadResponseProtection(sources[i], 
singleResultCounter, mergedResultCounter);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 6bf275d..5af2ad0 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1598,10 +1598,13 @@ public class StorageProxy implements StorageProxyMBean
         try
         {
             PartitionIterator result = fetchRows(group.commands, 
consistencyLevel);
+            // Note that the only difference between the command in a group 
must be the partition key on which
+            // they applied.
+            boolean enforceStrictLiveness = 
group.commands.get(0).metadata().enforceStrictLiveness();
             // If we have more than one command, then despite each read 
command honoring the limit, the total result
             // might not honor it and so we should enforce it
             if (group.commands.size() > 1)
-                result = group.limits().filter(result, group.nowInSec(), 
group.selectsFullPartition());
+                result = group.limits().filter(result, group.nowInSec(), 
group.selectsFullPartition(), enforceStrictLiveness);
             return result;
         }
         catch (UnavailableException e)
@@ -1995,6 +1998,7 @@ public class StorageProxy implements StorageProxyMBean
         private final PartitionRangeReadCommand command;
         private final Keyspace keyspace;
         private final ConsistencyLevel consistency;
+        private final boolean enforceStrictLiveness;
 
         private final long startTime;
         private DataLimits.Counter counter;
@@ -2015,6 +2019,7 @@ public class StorageProxy implements StorageProxyMBean
             this.totalRangeCount = ranges.rangeCount();
             this.consistency = consistency;
             this.keyspace = keyspace;
+            this.enforceStrictLiveness = 
command.metadata().enforceStrictLiveness();
         }
 
         public RowIterator computeNext()
@@ -2119,7 +2124,7 @@ public class StorageProxy implements StorageProxyMBean
             Tracing.trace("Submitted {} concurrent range requests", 
concurrentQueries.size());
             // We want to count the results for the sake of updating the 
concurrency factor (see updateConcurrencyFactor) but we don't want to
             // enforce any particular limit at this point (this could break 
code than rely on postReconciliationProcessing), hence the DataLimits.NONE.
-            counter = DataLimits.NONE.newCounter(command.nowInSec(), true, 
command.selectsFullPartition());
+            counter = DataLimits.NONE.newCounter(command.nowInSec(), true, 
command.selectsFullPartition(), enforceStrictLiveness);
             return 
counter.applyTo(PartitionIterators.concat(concurrentQueries));
         }
 
@@ -2163,7 +2168,8 @@ public class StorageProxy implements StorageProxyMBean
 
         return 
command.limits().filter(command.postReconciliationProcessing(new 
RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, 
consistencyLevel)),
                                        command.nowInSec(),
-                                       command.selectsFullPartition());
+                                       command.selectsFullPartition(),
+                                       
command.metadata().enforceStrictLiveness());
     }
 
     public Map<String, List<String>> getSchemaVersions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java 
b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index ffd1b82..f44aa24 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -32,6 +32,7 @@ abstract class AbstractQueryPager implements QueryPager
     protected final ReadCommand command;
     protected final DataLimits limits;
     protected final int protocolVersion;
+    private final boolean enforceStrictLiveness;
 
     private int remaining;
 
@@ -48,6 +49,7 @@ abstract class AbstractQueryPager implements QueryPager
         this.command = command;
         this.protocolVersion = protocolVersion;
         this.limits = command.limits();
+        this.enforceStrictLiveness = 
command.metadata().enforceStrictLiveness();
 
         this.remaining = limits.count();
         this.remainingInPartition = limits.perPartitionCount();
@@ -126,7 +128,7 @@ abstract class AbstractQueryPager implements QueryPager
 
         private Pager(DataLimits pageLimits, int nowInSec)
         {
-            this.counter = pageLimits.newCounter(nowInSec, true, 
command.selectsFullPartition());
+            this.counter = pageLimits.newCounter(nowInSec, true, 
command.selectsFullPartition(), enforceStrictLiveness);
             this.pageLimits = pageLimits;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java 
b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 11bbc0e..344c64d 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -126,7 +126,11 @@ public class MultiPartitionPager implements QueryPager
     {
         int toQuery = Math.min(remaining, pageSize);
         PagersIterator iter = new PagersIterator(toQuery, consistency, 
clientState, null);
-        DataLimits.Counter counter = 
limit.forPaging(toQuery).newCounter(nowInSec, true, selectsFullPartitions);
+        /**
+         * It's safe to set it as false since all PartitionIterators have been 
filtered by each SPRC.
+         */
+        boolean enforceStrictLiveness = false;
+        DataLimits.Counter counter = 
limit.forPaging(toQuery).newCounter(nowInSec, true, selectsFullPartitions, 
enforceStrictLiveness);
         iter.setCounter(counter);
         return counter.applyTo(iter);
     }
@@ -136,7 +140,11 @@ public class MultiPartitionPager implements QueryPager
     {
         int toQuery = Math.min(remaining, pageSize);
         PagersIterator iter = new PagersIterator(toQuery, null, null, 
orderGroup);
-        DataLimits.Counter counter = 
limit.forPaging(toQuery).newCounter(nowInSec, true, selectsFullPartitions);
+        /**
+         * It's safe to set it as false since all PartitionIterators have been 
filtered by each SPRC.
+         */
+        boolean enforceStrictLiveness = false;
+        DataLimits.Counter counter = 
limit.forPaging(toQuery).newCounter(nowInSec, true, selectsFullPartitions, 
enforceStrictLiveness);
         iter.setCounter(counter);
         return counter.applyTo(iter);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java 
b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index c26bf3f..6bc1f80 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -55,7 +55,7 @@ public class QueryPagers
         {
             try (PartitionIterator iter = pager.fetchPage(pageSize, 
consistencyLevel, state))
             {
-                DataLimits.Counter counter = limits.newCounter(nowInSec, true, 
command.selectsFullPartition());
+                DataLimits.Counter counter = limits.newCounter(nowInSec, true, 
command.selectsFullPartition(), metadata.enforceStrictLiveness());
                 PartitionIterators.consume(counter.applyTo(iter));
                 count += counter.counted();
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java 
b/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
index 9e32620..d0f80f8 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
@@ -796,6 +796,7 @@ public class ViewComplexTest extends CQLTester
         if (flush)
             FBUtilities.waitOnFutures(ks.flush());
         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(2, 3, 3, 6L));
+        assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv limit 1"), row(2, 3, 3, 6L));
         // change v1's to 1 and remove existing view row with ts8
         updateView("UPdate %s using timestamp 8 set v1 = 1 where p = 3;");
         if (flush)
@@ -804,6 +805,65 @@ public class ViewComplexTest extends CQLTester
     }
 
     @Test
+    public void testExpiredLivenessLimitWithFlush() throws Throwable
+    {
+        // CASSANDRA-13883
+        testExpiredLivenessLimit(true);
+    }
+
+    @Test
+    public void testExpiredLivenessLimitWithoutFlush() throws Throwable
+    {
+        // CASSANDRA-13883
+        testExpiredLivenessLimit(false);
+    }
+
+    private void testExpiredLivenessLimit(boolean flush) throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int);");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+        Keyspace ks = Keyspace.open(keyspace());
+
+        createView("mv1", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a);");
+        createView("mv2", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s 
WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (a, k);");
+        ks.getColumnFamilyStore("mv1").disableAutoCompaction();
+        ks.getColumnFamilyStore("mv2").disableAutoCompaction();
+
+        for (int i = 1; i <= 100; i++)
+            updateView("INSERT INTO %s(k, a, b) VALUES (?, ?, ?);", i, i, i);
+        for (int i = 1; i <= 100; i++)
+        {
+            if (i % 50 == 0)
+                continue;
+            // create expired liveness
+            updateView("DELETE a FROM %s WHERE k = ?;", i);
+        }
+        if (flush)
+        {
+            ks.getColumnFamilyStore("mv1").forceBlockingFlush();
+            ks.getColumnFamilyStore("mv2").forceBlockingFlush();
+        }
+
+        for (String view : Arrays.asList("mv1", "mv2"))
+        {
+            // paging
+            assertEquals(1, executeNetWithPaging(String.format("SELECT k,a,b 
FROM %s limit 1", view), 1).all().size());
+            assertEquals(2, executeNetWithPaging(String.format("SELECT k,a,b 
FROM %s limit 2", view), 1).all().size());
+            assertEquals(2, executeNetWithPaging(String.format("SELECT k,a,b 
FROM %s", view), 1).all().size());
+            assertRowsNet(executeNetWithPaging(String.format("SELECT k,a,b 
FROM %s ", view), 1),
+                          row(50, 50, 50),
+                          row(100, 100, 100));
+            // limit
+            assertEquals(1, execute(String.format("SELECT k,a,b FROM %s limit 
1", view)).size());
+            assertRowsIgnoringOrder(execute(String.format("SELECT k,a,b FROM 
%s limit 2", view)),
+                                    row(50, 50, 50),
+                                    row(100, 100, 100));
+        }
+    }
+
+    @Test
     public void testUpdateWithColumnTimestampBiggerThanPkWithFlush() throws 
Throwable
     {
         // CASSANDRA-11500
@@ -848,6 +908,7 @@ public class ViewComplexTest extends CQLTester
             FBUtilities.waitOnFutures(ks.flush());
         ks.getColumnFamilyStore("mv").forceMajorCompaction();
         assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 2, 2));
+        assertRowsIgnoringOrder(execute("SELECT k,a,b from mv limit 1"), 
row(1, 2, 2));
         updateView("UPDATE %s USING TIMESTAMP 11 SET a = 1 WHERE k = 1;");
         if (flush)
             FBUtilities.waitOnFutures(ks.flush());
@@ -1048,12 +1109,12 @@ public class ViewComplexTest extends CQLTester
                                        .sorted(Comparator.comparingInt(s -> 
s.descriptor.generation))
                                        .map(s -> s.getFilename())
                                        .collect(Collectors.toList());
-            System.out.println("SSTables " + sstables);
             String dataFiles = String.join(",", Arrays.asList(sstables.get(1), 
sstables.get(2)));
             CompactionManager.instance.forceUserDefinedCompaction(dataFiles);
         }
         // cell-tombstone in sstable 4 is not compacted away, because the 
shadowable tombstone is shadowed by new row.
         assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv"), row(1, 3, null, null));
+        assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from 
mv limit 1"), row(1, 3, null, null));
     }
 
     @Test
@@ -1172,6 +1233,7 @@ public class ViewComplexTest extends CQLTester
             FBUtilities.waitOnFutures(ks.flush());
         // deleted column in MV remained dead
         assertRowsIgnoringOrder(execute("SELECT * from mv"), row(1, 3, null));
+        assertRowsIgnoringOrder(execute("SELECT * from mv limit 1"), row(1, 3, 
null));
 
         // insert values TS=2, it should be considered dead due to previous 
tombstone
         executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET v2 = ? 
WHERE p = ?", 4, 3);
@@ -1183,6 +1245,7 @@ public class ViewComplexTest extends CQLTester
 
         ks.getColumnFamilyStore("mv").forceMajorCompaction();
         assertRows(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 
3, 4, 3L));
+        assertRows(execute("SELECT v1, p, v2, WRITETIME(v2) from mv limit 1"), 
row(1, 3, 4, 3L));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java 
b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index 84b2773..1107a64 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -363,7 +363,9 @@ public class ViewTest extends CQLTester
         updateView("UPDATE %s USING TIMESTAMP 2 SET val = ? WHERE k = ?", 1, 
0);
         updateView("UPDATE %s USING TIMESTAMP 4 SET c = ? WHERE k = ?", 2, 0);
         updateView("UPDATE %s USING TIMESTAMP 3 SET val = ? WHERE k = ?", 2, 
0);
+
         assertRows(execute("SELECT c, k, val FROM mv_rctstest"), row(2, 0, 2));
+        assertRows(execute("SELECT c, k, val FROM mv_rctstest limit 1"), 
row(2, 0, 2));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java 
b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index d0cc890..967a85c 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -76,6 +76,7 @@ public class RangeTombstoneTest
     {
         Keyspace keyspace = Keyspace.open(KSNAME);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CFNAME);
+        boolean enforceStrictLiveness = cfs.metadata.enforceStrictLiveness();
 
         // Inserting data
         String key = "k1";
@@ -112,17 +113,21 @@ public class RangeTombstoneTest
         int nowInSec = FBUtilities.nowInSeconds();
 
         for (int i : live)
-            assertTrue("Row " + i + " should be live", partition.getRow(new 
Clustering(bb(i))).hasLiveData(nowInSec));
+            assertTrue("Row " + i + " should be live",
+                       partition.getRow(new 
Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness));
         for (int i : dead)
-            assertFalse("Row " + i + " shouldn't be live", 
partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
+            assertFalse("Row " + i + " shouldn't be live",
+                        partition.getRow(new 
Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness));
 
         // Queries by slices
         partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, 
key).fromIncl(7).toIncl(30).build());
 
         for (int i : new int[]{ 7, 8, 9, 11, 13, 15, 17, 28, 29, 30 })
-            assertTrue("Row " + i + " should be live", partition.getRow(new 
Clustering(bb(i))).hasLiveData(nowInSec));
+            assertTrue("Row " + i + " should be live",
+                       partition.getRow(new 
Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness));
         for (int i : new int[]{ 10, 12, 14, 16, 18, 19, 20, 21, 22, 23, 24, 
25, 26, 27 })
-            assertFalse("Row " + i + " shouldn't be live", 
partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
+            assertFalse("Row " + i + " shouldn't be live",
+                        partition.getRow(new 
Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness));
     }
 
     @Test
@@ -385,6 +390,7 @@ public class RangeTombstoneTest
         CompactionManager.instance.disableAutoCompaction();
         Keyspace keyspace = Keyspace.open(KSNAME);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CFNAME);
+        boolean enforceStrictLiveness = cfs.metadata.enforceStrictLiveness();
 
         // Inserting data
         String key = "k2";
@@ -408,22 +414,30 @@ public class RangeTombstoneTest
         int nowInSec = FBUtilities.nowInSeconds();
 
         for (int i = 0; i < 5; i++)
-            assertTrue("Row " + i + " should be live", partition.getRow(new 
Clustering(bb(i))).hasLiveData(nowInSec));
+            assertTrue("Row " + i + " should be live",
+                       partition.getRow(new 
Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness));
         for (int i = 16; i < 20; i++)
-            assertTrue("Row " + i + " should be live", partition.getRow(new 
Clustering(bb(i))).hasLiveData(nowInSec));
+            assertTrue("Row " + i + " should be live",
+                       partition.getRow(new 
Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness));
         for (int i = 5; i <= 15; i++)
-            assertFalse("Row " + i + " shouldn't be live", 
partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
+            assertFalse("Row " + i + " shouldn't be live",
+                        partition.getRow(new 
Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness));
 
         // Compact everything and re-test
         CompactionManager.instance.performMaximal(cfs, false);
         partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, 
key).build());
 
         for (int i = 0; i < 5; i++)
-            assertTrue("Row " + i + " should be live", partition.getRow(new 
Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
+            assertTrue("Row " + i + " should be live",
+                       partition.getRow(new 
Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds(),
+                                                                           
enforceStrictLiveness));
         for (int i = 16; i < 20; i++)
-            assertTrue("Row " + i + " should be live", partition.getRow(new 
Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
+            assertTrue("Row " + i + " should be live",
+                       partition.getRow(new 
Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds(),
+                                                                           
enforceStrictLiveness));
         for (int i = 5; i <= 15; i++)
-            assertFalse("Row " + i + " shouldn't be live", 
partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
+            assertFalse("Row " + i + " shouldn't be live",
+                        partition.getRow(new 
Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java 
b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 436b916..f02f4c2 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -320,6 +320,7 @@ public class CompactionsPurgeTest
         Keyspace keyspace = Keyspace.open(KEYSPACE2);
         String cfName = "Standard1";
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
+        final boolean enforceStrictLiveness = 
cfs.metadata.enforceStrictLiveness();
         String key3 = "key3";
 
         // inserts
@@ -352,7 +353,7 @@ public class CompactionsPurgeTest
         ImmutableBTreePartition partition = 
Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key3).build());
         assertEquals(2, partition.rowCount());
         for (Row row : partition)
-            assertFalse(row.hasLiveData(FBUtilities.nowInSeconds()));
+            assertFalse(row.hasLiveData(FBUtilities.nowInSeconds(), 
enforceStrictLiveness));
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to