Handle limit correctly on tables with strict liveness Patch by Zhao Yang; Reviewed by Paulo Motta for CASSANDRA-13883
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/68bdf454 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/68bdf454 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/68bdf454 Branch: refs/heads/cassandra-3.11 Commit: 68bdf45477417c97fa6ed3840eee39b8390fd678 Parents: 51e6f24 Author: Zhao Yang <zhaoyangsingap...@gmail.com> Authored: Tue Sep 19 18:35:24 2017 +0800 Committer: Paulo Motta <pa...@apache.org> Committed: Mon Sep 25 01:00:45 2017 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 5 +- .../org/apache/cassandra/db/ReadCommand.java | 3 +- .../db/SinglePartitionReadCommand.java | 9 ++- .../apache/cassandra/db/filter/DataLimits.java | 81 ++++++++++++++------ .../db/partitions/CachedBTreePartition.java | 3 +- .../apache/cassandra/db/rows/AbstractRow.java | 6 +- src/java/org/apache/cassandra/db/rows/Row.java | 7 +- .../cassandra/db/view/ViewUpdateGenerator.java | 6 +- .../composites/ClusteringColumnIndex.java | 5 +- .../internal/composites/PartitionKeyIndex.java | 4 +- .../apache/cassandra/service/DataResolver.java | 7 +- .../apache/cassandra/service/StorageProxy.java | 12 ++- .../service/pager/AbstractQueryPager.java | 4 +- .../service/pager/MultiPartitionPager.java | 12 ++- .../cassandra/service/pager/QueryPagers.java | 2 +- .../apache/cassandra/cql3/ViewComplexTest.java | 65 +++++++++++++++- .../org/apache/cassandra/cql3/ViewTest.java | 2 + .../apache/cassandra/db/RangeTombstoneTest.java | 34 +++++--- .../db/compaction/CompactionsPurgeTest.java | 3 +- 20 files changed, 213 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 68da81a..9cba02b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Handle limit correctly on tables with strict liveness (CASSANDRA-13883) * Fix missing original update in TriggerExecutor (CASSANDRA-13894) * Remove non-rpc-ready nodes from counter leader candidates (CASSANDRA-13043) * Improve short read protection performance (CASSANDRA-13794) http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index e6e46b2..0d19856 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1495,7 +1495,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // is not in the cache. We can guarantee that if either the filter is a "head filter" and the cached // partition has more live rows that queried (where live rows refers to the rows that are live now), // or if we can prove that everything the filter selects is in the cached partition based on its content. - return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached, nowInSec, filter.selectsAllPartition())) + return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached, + nowInSec, + filter.selectsAllPartition(), + metadata.enforceStrictLiveness())) || filter.isFullyCoveredBy(cached); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index b73cdde..160b104 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -450,6 +450,7 @@ public abstract class ReadCommand implements ReadQuery private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold(); private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName); + private final boolean enforceStrictLiveness = metadata.enforceStrictLiveness(); private int liveRows = 0; private int tombstones = 0; @@ -472,7 +473,7 @@ public abstract class ReadCommand implements ReadQuery @Override public Row applyToRow(Row row) { - if (row.hasLiveData(ReadCommand.this.nowInSec())) + if (row.hasLiveData(ReadCommand.this.nowInSec(), enforceStrictLiveness)) ++liveRows; for (Cell cell : row.cells()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 00464ca..7a66eca 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -556,6 +556,7 @@ public class SinglePartitionReadCommand extends ReadCommand try { final int rowsToCache = metadata().params.caching.rowsPerPartitionToCache(); + final boolean enforceStrictLiveness = metadata().enforceStrictLiveness(); @SuppressWarnings("resource") // we close on exception or upon closing the result of this method UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp); @@ -579,7 +580,7 @@ public class SinglePartitionReadCommand extends ReadCommand if (unfiltered.isRow()) { Row row = (Row) unfiltered; - if (row.hasLiveData(nowInSec())) + if (row.hasLiveData(nowInSec(), enforceStrictLiveness)) rowsCounted++; } return unfiltered; @@ -1161,10 +1162,14 @@ public class SinglePartitionReadCommand extends ReadCommand for (SinglePartitionReadCommand cmd : commands) partitions.add(cmd.executeInternal(orderGroup)); + // Note that the only difference between the command in a group must be the partition key on which + // they applied. + boolean enforceStrictLiveness = commands.get(0).metadata().enforceStrictLiveness(); // Because we only have enforce the limit per command, we need to enforce it globally. return limits.filter(PartitionIterators.concat(partitions), nowInSec, - selectsFullPartitions); + selectsFullPartitions, + enforceStrictLiveness); } public QueryPager getPager(PagingState pagingState, int protocolVersion) http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java index 48ec06a..6b74293 100644 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.filter; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; @@ -46,7 +47,7 @@ public abstract class DataLimits public static final DataLimits NONE = new CQLLimits(NO_LIMIT) { @Override - public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData) + public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { return false; } @@ -109,7 +110,10 @@ public abstract class DataLimits public abstract DataLimits forShortReadRetry(int toFetch); - public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData); + public abstract boolean hasEnoughLiveData(CachedPartition cached, + int nowInSec, + boolean countPartitionsWithOnlyStaticData, + boolean enforceStrictLiveness); /** * Returns a new {@code Counter} for this limits. @@ -120,9 +124,14 @@ public abstract class DataLimits * {@code RowIterator} (since it only returns live rows), false otherwise. * @param countPartitionsWithOnlyStaticData if {@code true} the partitions with only static data should be counted * as 1 valid row. + * @param enforceStrictLiveness whether the row should be purged if there is no PK liveness info, + * normally retrieved from {@link CFMetaData#enforceStrictLiveness()} * @return a new {@code Counter} for this limits. */ - public abstract Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData); + public abstract Counter newCounter(int nowInSec, + boolean assumeLiveData, + boolean countPartitionsWithOnlyStaticData, + boolean enforceStrictLiveness); /** * The max number of results this limits enforces. @@ -140,19 +149,27 @@ public abstract class DataLimits int nowInSec, boolean countPartitionsWithOnlyStaticData) { - return this.newCounter(nowInSec, false, countPartitionsWithOnlyStaticData).applyTo(iter); + return this.newCounter(nowInSec, + false, + countPartitionsWithOnlyStaticData, + iter.metadata().enforceStrictLiveness()) + .applyTo(iter); } public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData) { - return this.newCounter(nowInSec, false, countPartitionsWithOnlyStaticData).applyTo(iter); + return this.newCounter(nowInSec, + false, + countPartitionsWithOnlyStaticData, + iter.metadata().enforceStrictLiveness()) + .applyTo(iter); } - public PartitionIterator filter(PartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData) + public PartitionIterator filter(PartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { - return this.newCounter(nowInSec, true, countPartitionsWithOnlyStaticData).applyTo(iter); + return this.newCounter(nowInSec, true, countPartitionsWithOnlyStaticData, enforceStrictLiveness).applyTo(iter); } /** @@ -301,7 +318,7 @@ public abstract class DataLimits return new CQLLimits(toFetch, NO_LIMIT, isDistinct); } - public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData) + public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { // We want the number of row that are currently live. Getting that precise number forces // us to iterate the cached partition in general, but we can avoid that if: @@ -316,7 +333,7 @@ public abstract class DataLimits // Otherwise, we need to re-count - DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData); + DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData, enforceStrictLiveness); try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false); UnfilteredRowIterator iter = counter.applyTo(cacheIter)) { @@ -327,9 +344,12 @@ public abstract class DataLimits } } - public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + public Counter newCounter(int nowInSec, + boolean assumeLiveData, + boolean countPartitionsWithOnlyStaticData, + boolean enforceStrictLiveness) { - return new CQLCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); + return new CQLCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); } public int count() @@ -360,25 +380,30 @@ public abstract class DataLimits protected int rowInCurrentPartition; protected boolean hasLiveStaticRow; + private final boolean enforceStrictLiveness; - public CQLCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + public CQLCounter(int nowInSec, + boolean assumeLiveData, + boolean countPartitionsWithOnlyStaticData, + boolean enforceStrictLiveness) { this.nowInSec = nowInSec; this.assumeLiveData = assumeLiveData; this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData; + this.enforceStrictLiveness = enforceStrictLiveness; } @Override public void applyToPartition(DecoratedKey partitionKey, Row staticRow) { rowInCurrentPartition = 0; - hasLiveStaticRow = !staticRow.isEmpty() && (assumeLiveData || staticRow.hasLiveData(nowInSec)); + hasLiveStaticRow = !staticRow.isEmpty() && (assumeLiveData || staticRow.hasLiveData(nowInSec, enforceStrictLiveness)); } @Override public Row applyToRow(Row row) { - if (assumeLiveData || row.hasLiveData(nowInSec)) + if (assumeLiveData || row.hasLiveData(nowInSec, enforceStrictLiveness)) incrementRowCount(); return row; } @@ -473,16 +498,19 @@ public abstract class DataLimits } @Override - public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { - return new PagingAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); + return new PagingAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); } private class PagingAwareCounter extends CQLCounter { - private PagingAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + private PagingAwareCounter(int nowInSec, + boolean assumeLiveData, + boolean countPartitionsWithOnlyStaticData, + boolean enforceStrictLiveness) { - super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); + super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); } @Override @@ -556,7 +584,7 @@ public abstract class DataLimits return new ThriftLimits(1, toFetch); } - public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData) + public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { // We want the number of cells that are currently live. Getting that precise number forces // us to iterate the cached partition in general, but we can avoid that if: @@ -570,7 +598,7 @@ public abstract class DataLimits return false; // Otherwise, we need to re-count - DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData); + DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData, enforceStrictLiveness); try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false); UnfilteredRowIterator iter = counter.applyTo(cacheIter)) { @@ -581,7 +609,7 @@ public abstract class DataLimits } } - public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { return new ThriftCounter(nowInSec, assumeLiveData); } @@ -711,23 +739,26 @@ public abstract class DataLimits } @Override - public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { - return new SuperColumnCountingCounter(nowInSec, assumeLiveData); + return new SuperColumnCountingCounter(nowInSec, assumeLiveData, enforceStrictLiveness); } protected class SuperColumnCountingCounter extends ThriftCounter { - public SuperColumnCountingCounter(int nowInSec, boolean assumeLiveData) + private final boolean enforceStrictLiveness; + + public SuperColumnCountingCounter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness) { super(nowInSec, assumeLiveData); + this.enforceStrictLiveness = enforceStrictLiveness; } @Override public Row applyToRow(Row row) { // In the internal format, a row == a super column, so that's what we want to count. - if (assumeLiveData || row.hasLiveData(nowInSec)) + if (assumeLiveData || row.hasLiveData(nowInSec, enforceStrictLiveness)) { ++cellsCounted; if (++cellsInCurrentPartition >= cellPerPartitionLimit) http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java index afe1cc3..9c6ab59 100644 --- a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java @@ -91,10 +91,11 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac int rowsWithNonExpiringCells = 0; int nonTombstoneCellCount = 0; int nonExpiringLiveCells = 0; + boolean enforceStrictLiveness = iterator.metadata().enforceStrictLiveness(); for (Row row : BTree.<Row>iterable(holder.tree)) { - if (row.hasLiveData(nowInSec)) + if (row.hasLiveData(nowInSec, enforceStrictLiveness)) ++cachedLiveRows; int nonExpiringLiveCellsThisRow = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/rows/AbstractRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java index 59addeb..67ed219 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java @@ -42,11 +42,13 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme return Unfiltered.Kind.ROW; } - public boolean hasLiveData(int nowInSec) + @Override + public boolean hasLiveData(int nowInSec, boolean enforceStrictLiveness) { if (primaryKeyLivenessInfo().isLive(nowInSec)) return true; - + else if (enforceStrictLiveness) + return false; return Iterables.any(cells(), cell -> cell.isLive(nowInSec)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 3bcc220..3c97e09 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -103,8 +103,13 @@ public interface Row extends Unfiltered, Collection<ColumnData> /** * Whether the row has some live information (i.e. it's not just deletion informations). + * + * @param nowInSec the current time to decide what is deleted and what isn't + * @param enforceStrictLiveness whether the row should be purged if there is no PK liveness info, + * normally retrieved from {@link CFMetaData#enforceStrictLiveness()} + * @return true if there is some live information */ - public boolean hasLiveData(int nowInSec); + public boolean hasLiveData(int nowInSec, boolean enforceStrictLiveness); /** * Returns a cell for a simple column. http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java index 0c8e078..341c511 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java +++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java @@ -51,6 +51,7 @@ public class ViewUpdateGenerator private final ByteBuffer[] basePartitionKey; private final CFMetaData viewMetadata; + private final boolean baseEnforceStrictLiveness; private final Map<DecoratedKey, PartitionUpdate> updates = new HashMap<>(); @@ -87,6 +88,7 @@ public class ViewUpdateGenerator this.nowInSec = nowInSec; this.baseMetadata = view.getDefinition().baseTableMetadata(); + this.baseEnforceStrictLiveness = baseMetadata.enforceStrictLiveness(); this.baseDecoratedKey = basePartitionKey; this.basePartitionKey = extractKeyComponents(basePartitionKey, baseMetadata.getKeyValidator()); @@ -186,8 +188,8 @@ public class ViewUpdateGenerator // The view entry is necessarily the same pre and post update. // Note that we allow existingBaseRow to be null and treat it as empty (see MultiViewUpdateBuilder.generateViewsMutations). - boolean existingHasLiveData = existingBaseRow != null && existingBaseRow.hasLiveData(nowInSec); - boolean mergedHasLiveData = mergedBaseRow.hasLiveData(nowInSec); + boolean existingHasLiveData = existingBaseRow != null && existingBaseRow.hasLiveData(nowInSec, baseEnforceStrictLiveness); + boolean mergedHasLiveData = mergedBaseRow.hasLiveData(nowInSec, baseEnforceStrictLiveness); return existingHasLiveData ? (mergedHasLiveData ? UpdateAction.UPDATE_EXISTING : UpdateAction.DELETE_OLD) : (mergedHasLiveData ? UpdateAction.NEW_ENTRY : UpdateAction.NONE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java b/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java index b932602..cace6de 100644 --- a/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java +++ b/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java @@ -45,9 +45,12 @@ import org.apache.cassandra.schema.IndexMetadata; */ public class ClusteringColumnIndex extends CassandraIndex { + private final boolean enforceStrictLiveness; + public ClusteringColumnIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) { super(baseCfs, indexDef); + this.enforceStrictLiveness = baseCfs.metadata.enforceStrictLiveness(); } @@ -95,6 +98,6 @@ public class ClusteringColumnIndex extends CassandraIndex public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec) { - return !data.hasLiveData(nowInSec); + return !data.hasLiveData(nowInSec, enforceStrictLiveness); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java b/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java index 2c0b5aa..d854102 100644 --- a/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java +++ b/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java @@ -47,9 +47,11 @@ import org.apache.cassandra.schema.IndexMetadata; */ public class PartitionKeyIndex extends CassandraIndex { + private final boolean enforceStrictLiveness; public PartitionKeyIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef) { super(baseCfs, indexDef); + this.enforceStrictLiveness = baseCfs.metadata.enforceStrictLiveness(); } public ByteBuffer getIndexedValue(ByteBuffer partitionKey, @@ -90,6 +92,6 @@ public class PartitionKeyIndex extends CassandraIndex public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec) { - return !data.hasLiveData(nowInSec); + return !data.hasLiveData(nowInSec, enforceStrictLiveness); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index f1eedd1..61bffe5 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -44,9 +44,12 @@ public class DataResolver extends ResponseResolver @VisibleForTesting final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>()); + private final boolean enforceStrictLiveness; + DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) { super(keyspace, command, consistency, maxResponseCount); + this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); } public PartitionIterator getData() @@ -98,7 +101,7 @@ public class DataResolver extends ResponseResolver */ DataLimits.Counter mergedResultCounter = - command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition()); + command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness); UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter); FilteredPartitions filtered = @@ -125,7 +128,7 @@ public class DataResolver extends ResponseResolver for (int i = 0; i < results.size(); i++) { DataLimits.Counter singleResultCounter = - command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount(); + command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(), enforceStrictLiveness).onlyCount(); ShortReadResponseProtection protection = new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter); http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 6bf275d..5af2ad0 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1598,10 +1598,13 @@ public class StorageProxy implements StorageProxyMBean try { PartitionIterator result = fetchRows(group.commands, consistencyLevel); + // Note that the only difference between the command in a group must be the partition key on which + // they applied. + boolean enforceStrictLiveness = group.commands.get(0).metadata().enforceStrictLiveness(); // If we have more than one command, then despite each read command honoring the limit, the total result // might not honor it and so we should enforce it if (group.commands.size() > 1) - result = group.limits().filter(result, group.nowInSec(), group.selectsFullPartition()); + result = group.limits().filter(result, group.nowInSec(), group.selectsFullPartition(), enforceStrictLiveness); return result; } catch (UnavailableException e) @@ -1995,6 +1998,7 @@ public class StorageProxy implements StorageProxyMBean private final PartitionRangeReadCommand command; private final Keyspace keyspace; private final ConsistencyLevel consistency; + private final boolean enforceStrictLiveness; private final long startTime; private DataLimits.Counter counter; @@ -2015,6 +2019,7 @@ public class StorageProxy implements StorageProxyMBean this.totalRangeCount = ranges.rangeCount(); this.consistency = consistency; this.keyspace = keyspace; + this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); } public RowIterator computeNext() @@ -2119,7 +2124,7 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size()); // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE. - counter = DataLimits.NONE.newCounter(command.nowInSec(), true, command.selectsFullPartition()); + counter = DataLimits.NONE.newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness); return counter.applyTo(PartitionIterators.concat(concurrentQueries)); } @@ -2163,7 +2168,8 @@ public class StorageProxy implements StorageProxyMBean return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec(), - command.selectsFullPartition()); + command.selectsFullPartition(), + command.metadata().enforceStrictLiveness()); } public Map<String, List<String>> getSchemaVersions() http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index ffd1b82..f44aa24 100644 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@ -32,6 +32,7 @@ abstract class AbstractQueryPager implements QueryPager protected final ReadCommand command; protected final DataLimits limits; protected final int protocolVersion; + private final boolean enforceStrictLiveness; private int remaining; @@ -48,6 +49,7 @@ abstract class AbstractQueryPager implements QueryPager this.command = command; this.protocolVersion = protocolVersion; this.limits = command.limits(); + this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); this.remaining = limits.count(); this.remainingInPartition = limits.perPartitionCount(); @@ -126,7 +128,7 @@ abstract class AbstractQueryPager implements QueryPager private Pager(DataLimits pageLimits, int nowInSec) { - this.counter = pageLimits.newCounter(nowInSec, true, command.selectsFullPartition()); + this.counter = pageLimits.newCounter(nowInSec, true, command.selectsFullPartition(), enforceStrictLiveness); this.pageLimits = pageLimits; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java index 11bbc0e..344c64d 100644 --- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java @@ -126,7 +126,11 @@ public class MultiPartitionPager implements QueryPager { int toQuery = Math.min(remaining, pageSize); PagersIterator iter = new PagersIterator(toQuery, consistency, clientState, null); - DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true, selectsFullPartitions); + /** + * It's safe to set it as false since all PartitionIterators have been filtered by each SPRC. + */ + boolean enforceStrictLiveness = false; + DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true, selectsFullPartitions, enforceStrictLiveness); iter.setCounter(counter); return counter.applyTo(iter); } @@ -136,7 +140,11 @@ public class MultiPartitionPager implements QueryPager { int toQuery = Math.min(remaining, pageSize); PagersIterator iter = new PagersIterator(toQuery, null, null, orderGroup); - DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true, selectsFullPartitions); + /** + * It's safe to set it as false since all PartitionIterators have been filtered by each SPRC. + */ + boolean enforceStrictLiveness = false; + DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true, selectsFullPartitions, enforceStrictLiveness); iter.setCounter(counter); return counter.applyTo(iter); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/src/java/org/apache/cassandra/service/pager/QueryPagers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java index c26bf3f..6bc1f80 100644 --- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java +++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java @@ -55,7 +55,7 @@ public class QueryPagers { try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state)) { - DataLimits.Counter counter = limits.newCounter(nowInSec, true, command.selectsFullPartition()); + DataLimits.Counter counter = limits.newCounter(nowInSec, true, command.selectsFullPartition(), metadata.enforceStrictLiveness()); PartitionIterators.consume(counter.applyTo(iter)); count += counter.counted(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java b/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java index 9e32620..d0f80f8 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java @@ -796,6 +796,7 @@ public class ViewComplexTest extends CQLTester if (flush) FBUtilities.waitOnFutures(ks.flush()); assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(2, 3, 3, 6L)); + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv limit 1"), row(2, 3, 3, 6L)); // change v1's to 1 and remove existing view row with ts8 updateView("UPdate %s using timestamp 8 set v1 = 1 where p = 3;"); if (flush) @@ -804,6 +805,65 @@ public class ViewComplexTest extends CQLTester } @Test + public void testExpiredLivenessLimitWithFlush() throws Throwable + { + // CASSANDRA-13883 + testExpiredLivenessLimit(true); + } + + @Test + public void testExpiredLivenessLimitWithoutFlush() throws Throwable + { + // CASSANDRA-13883 + testExpiredLivenessLimit(false); + } + + private void testExpiredLivenessLimit(boolean flush) throws Throwable + { + createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int);"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv1", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a);"); + createView("mv2", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (a, k);"); + ks.getColumnFamilyStore("mv1").disableAutoCompaction(); + ks.getColumnFamilyStore("mv2").disableAutoCompaction(); + + for (int i = 1; i <= 100; i++) + updateView("INSERT INTO %s(k, a, b) VALUES (?, ?, ?);", i, i, i); + for (int i = 1; i <= 100; i++) + { + if (i % 50 == 0) + continue; + // create expired liveness + updateView("DELETE a FROM %s WHERE k = ?;", i); + } + if (flush) + { + ks.getColumnFamilyStore("mv1").forceBlockingFlush(); + ks.getColumnFamilyStore("mv2").forceBlockingFlush(); + } + + for (String view : Arrays.asList("mv1", "mv2")) + { + // paging + assertEquals(1, executeNetWithPaging(String.format("SELECT k,a,b FROM %s limit 1", view), 1).all().size()); + assertEquals(2, executeNetWithPaging(String.format("SELECT k,a,b FROM %s limit 2", view), 1).all().size()); + assertEquals(2, executeNetWithPaging(String.format("SELECT k,a,b FROM %s", view), 1).all().size()); + assertRowsNet(executeNetWithPaging(String.format("SELECT k,a,b FROM %s ", view), 1), + row(50, 50, 50), + row(100, 100, 100)); + // limit + assertEquals(1, execute(String.format("SELECT k,a,b FROM %s limit 1", view)).size()); + assertRowsIgnoringOrder(execute(String.format("SELECT k,a,b FROM %s limit 2", view)), + row(50, 50, 50), + row(100, 100, 100)); + } + } + + @Test public void testUpdateWithColumnTimestampBiggerThanPkWithFlush() throws Throwable { // CASSANDRA-11500 @@ -848,6 +908,7 @@ public class ViewComplexTest extends CQLTester FBUtilities.waitOnFutures(ks.flush()); ks.getColumnFamilyStore("mv").forceMajorCompaction(); assertRowsIgnoringOrder(execute("SELECT k,a,b from mv"), row(1, 2, 2)); + assertRowsIgnoringOrder(execute("SELECT k,a,b from mv limit 1"), row(1, 2, 2)); updateView("UPDATE %s USING TIMESTAMP 11 SET a = 1 WHERE k = 1;"); if (flush) FBUtilities.waitOnFutures(ks.flush()); @@ -1048,12 +1109,12 @@ public class ViewComplexTest extends CQLTester .sorted(Comparator.comparingInt(s -> s.descriptor.generation)) .map(s -> s.getFilename()) .collect(Collectors.toList()); - System.out.println("SSTables " + sstables); String dataFiles = String.join(",", Arrays.asList(sstables.get(1), sstables.get(2))); CompactionManager.instance.forceUserDefinedCompaction(dataFiles); } // cell-tombstone in sstable 4 is not compacted away, because the shadowable tombstone is shadowed by new row. assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, null, null)); + assertRowsIgnoringOrder(execute("SELECT v1, p, v2, WRITETIME(v2) from mv limit 1"), row(1, 3, null, null)); } @Test @@ -1172,6 +1233,7 @@ public class ViewComplexTest extends CQLTester FBUtilities.waitOnFutures(ks.flush()); // deleted column in MV remained dead assertRowsIgnoringOrder(execute("SELECT * from mv"), row(1, 3, null)); + assertRowsIgnoringOrder(execute("SELECT * from mv limit 1"), row(1, 3, null)); // insert values TS=2, it should be considered dead due to previous tombstone executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET v2 = ? WHERE p = ?", 4, 3); @@ -1183,6 +1245,7 @@ public class ViewComplexTest extends CQLTester ks.getColumnFamilyStore("mv").forceMajorCompaction(); assertRows(execute("SELECT v1, p, v2, WRITETIME(v2) from mv"), row(1, 3, 4, 3L)); + assertRows(execute("SELECT v1, p, v2, WRITETIME(v2) from mv limit 1"), row(1, 3, 4, 3L)); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/test/unit/org/apache/cassandra/cql3/ViewTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java index 84b2773..1107a64 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java @@ -363,7 +363,9 @@ public class ViewTest extends CQLTester updateView("UPDATE %s USING TIMESTAMP 2 SET val = ? WHERE k = ?", 1, 0); updateView("UPDATE %s USING TIMESTAMP 4 SET c = ? WHERE k = ?", 2, 0); updateView("UPDATE %s USING TIMESTAMP 3 SET val = ? WHERE k = ?", 2, 0); + assertRows(execute("SELECT c, k, val FROM mv_rctstest"), row(2, 0, 2)); + assertRows(execute("SELECT c, k, val FROM mv_rctstest limit 1"), row(2, 0, 2)); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java index d0cc890..967a85c 100644 --- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java +++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java @@ -76,6 +76,7 @@ public class RangeTombstoneTest { Keyspace keyspace = Keyspace.open(KSNAME); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CFNAME); + boolean enforceStrictLiveness = cfs.metadata.enforceStrictLiveness(); // Inserting data String key = "k1"; @@ -112,17 +113,21 @@ public class RangeTombstoneTest int nowInSec = FBUtilities.nowInSeconds(); for (int i : live) - assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); + assertTrue("Row " + i + " should be live", + partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness)); for (int i : dead) - assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); + assertFalse("Row " + i + " shouldn't be live", + partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness)); // Queries by slices partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(7).toIncl(30).build()); for (int i : new int[]{ 7, 8, 9, 11, 13, 15, 17, 28, 29, 30 }) - assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); + assertTrue("Row " + i + " should be live", + partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness)); for (int i : new int[]{ 10, 12, 14, 16, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27 }) - assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); + assertFalse("Row " + i + " shouldn't be live", + partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness)); } @Test @@ -385,6 +390,7 @@ public class RangeTombstoneTest CompactionManager.instance.disableAutoCompaction(); Keyspace keyspace = Keyspace.open(KSNAME); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CFNAME); + boolean enforceStrictLiveness = cfs.metadata.enforceStrictLiveness(); // Inserting data String key = "k2"; @@ -408,22 +414,30 @@ public class RangeTombstoneTest int nowInSec = FBUtilities.nowInSeconds(); for (int i = 0; i < 5; i++) - assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); + assertTrue("Row " + i + " should be live", + partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness)); for (int i = 16; i < 20; i++) - assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); + assertTrue("Row " + i + " should be live", + partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness)); for (int i = 5; i <= 15; i++) - assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); + assertFalse("Row " + i + " shouldn't be live", + partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness)); // Compact everything and re-test CompactionManager.instance.performMaximal(cfs, false); partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).build()); for (int i = 0; i < 5; i++) - assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds())); + assertTrue("Row " + i + " should be live", + partition.getRow(new Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds(), + enforceStrictLiveness)); for (int i = 16; i < 20; i++) - assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds())); + assertTrue("Row " + i + " should be live", + partition.getRow(new Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds(), + enforceStrictLiveness)); for (int i = 5; i <= 15; i++) - assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec)); + assertFalse("Row " + i + " shouldn't be live", + partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec, enforceStrictLiveness)); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/68bdf454/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java index 436b916..f02f4c2 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java @@ -320,6 +320,7 @@ public class CompactionsPurgeTest Keyspace keyspace = Keyspace.open(KEYSPACE2); String cfName = "Standard1"; ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName); + final boolean enforceStrictLiveness = cfs.metadata.enforceStrictLiveness(); String key3 = "key3"; // inserts @@ -352,7 +353,7 @@ public class CompactionsPurgeTest ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key3).build()); assertEquals(2, partition.rowCount()); for (Row row : partition) - assertFalse(row.hasLiveData(FBUtilities.nowInSeconds())); + assertFalse(row.hasLiveData(FBUtilities.nowInSeconds(), enforceStrictLiveness)); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org