This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cep-45-mutation-tracking in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 14a743f8f73023b81c951651f19f7d11a28c8b72 Author: Blake Eggleston <[email protected]> AuthorDate: Wed Apr 30 10:52:30 2025 -0700 CEP-45: Fix ALLOW FILTERING queries for mutation tracking Patch by Blake Eggleston; Reviewed by Aleksey Yeschenko for CASSANDRA-20555 --- src/java/org/apache/cassandra/db/ReadCommand.java | 31 +- .../cassandra/db/SinglePartitionReadCommand.java | 17 + .../org/apache/cassandra/db/filter/RowFilter.java | 135 +++-- .../db/partitions/PartitionIterators.java | 66 ++- .../reads/tracked/AbstractPartialTrackedRead.java | 228 ++++++-- .../reads/tracked/ExtendingCompletedRead.java | 186 ++++++ .../reads/tracked/FilteredFollowupRead.java | 177 ++++++ .../reads/tracked/PartialTrackedRangeRead.java | 638 +++++++++++++++------ .../service/reads/tracked/PartialTrackedRead.java | 25 +- .../tracked/PartialTrackedSinglePartitionRead.java | 110 ++-- .../service/reads/tracked/TrackedDataResponse.java | 87 ++- .../reads/tracked/TrackedLocalReadCoordinator.java | 18 +- .../service/reads/tracked/TrackedLocalReads.java | 5 +- .../service/reads/tracked/TrackedRead.java | 35 +- test/conf/logback-dtest.xml | 8 +- .../test/ReadRepairRangeQueriesTest.java | 123 +++- .../test/ReadRepairSliceQueriesTest.java | 3 - .../tracking/MutationTrackingPendingReadTest.java | 2 +- .../MutationTrackingReadReconciliationTest.java | 36 +- 19 files changed, 1546 insertions(+), 384 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index edd5aad08e..7f70a35eb4 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -544,6 +544,24 @@ public abstract class ReadCommand extends AbstractReadQuery } } + public RowFilter rowFilter(Index.Searcher searcher) + { + // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so + // no point in checking it again. + return (null == searcher) ? rowFilter() : indexQueryPlan.postIndexQueryFilter(); + } + + private UnfilteredPartitionIterator withRowFilter(UnfilteredPartitionIterator iterator, Index.Searcher searcher) + { + /* + * TODO: We'll currently do filtering by the rowFilter here because it's convenient. However, + * we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it + * would be more efficient (the sooner we discard stuff we know we don't care, the less useless + * processing we do on it). + */ + return rowFilter(searcher).filter(iterator, nowInSec()); + } + private UnfilteredPartitionIterator completeRead(UnfilteredPartitionIterator iterator, ReadExecutionController executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long startTimeNanos) { COMMAND.set(this); @@ -556,18 +574,7 @@ public abstract class ReadCommand extends AbstractReadQuery iterator = maybeRecordPurgeableTombstones(iterator, cfs); iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false); iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos); - - // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so - // no point in checking it again. - RowFilter filter = (null == searcher) ? rowFilter() : indexQueryPlan.postIndexQueryFilter(); - - /* - * TODO: We'll currently do filtering by the rowFilter here because it's convenient. However, - * we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it - * would be more efficient (the sooner we discard stuff we know we don't care, the less useless - * processing we do on it). - */ - iterator = filter.filter(iterator, nowInSec()); + iterator = withRowFilter(iterator, searcher); // apply the limits/row counter; this transformation is stopping and would close the iterator as soon // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included. diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 667c8349f9..1b40d679eb 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -182,6 +182,23 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar dataRange); } + public static SinglePartitionReadCommand fromRangeRead(DecoratedKey key, PartitionRangeReadCommand command, DataLimits limits) + { + return create(command.serializedAtEpoch(), + command.isDigestQuery(), + command.digestVersion(), + command.acceptsTransient(), + command.metadata(), + command.nowInSec(), + command.columnFilter(), + command.rowFilter(), + limits, + key, + command.clusteringIndexFilter(key), + command.indexQueryPlan(), + command.isTrackingWarnings()); + } + /** * Creates a new read command on a single partition. * diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index ef9f1a7e2c..c3c5fc3a80 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -54,6 +54,7 @@ import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.MapType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.BaseRowIterator; import org.apache.cassandra.db.rows.Cell; @@ -61,6 +62,7 @@ import org.apache.cassandra.db.rows.CellPath; import org.apache.cassandra.db.rows.ComplexColumnData; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -216,76 +218,123 @@ public class RowFilter implements Iterable<RowFilter.Expression> return false; } - /** - * Note that the application of this transformation does not yet take {@link #isStrict()} into account. This means - * that even when strict filtering is not safe, expressions will be applied as intersections rather than unions. - * The filter will always be evaluated strictly in conjunction with replica filtering protection at the - * coordinator, however, even after CASSANDRA-19007 is addressed. - * - * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-190007">CASSANDRA-19007</a> - */ - protected Transformation<BaseRowIterator<?>> filter(TableMetadata metadata, long nowInSec) + public static class RowFilterTransformation extends Transformation<BaseRowIterator<?>> { - List<Expression> partitionLevelExpressions = new ArrayList<>(); - List<Expression> rowLevelExpressions = new ArrayList<>(); - for (Expression e: expressions) + private final TableMetadata metadata; + private final long nowInSec; + private final List<Expression> partitionLevelExpressions = new ArrayList<>(); + private final List<Expression> rowLevelExpressions = new ArrayList<>(); + private final boolean needsReconciliation; + private DecoratedKey pk; + + private RowFilterTransformation(RowFilter filter, TableMetadata metadata, long nowInSec) { - if (e.column.isStatic() || e.column.isPartitionKey()) - partitionLevelExpressions.add(e); - else - rowLevelExpressions.add(e); + this.metadata = metadata; + this.nowInSec = nowInSec; + for (Expression e: filter.expressions) + { + if (e.column.isStatic() || e.column.isPartitionKey()) + partitionLevelExpressions.add(e); + else + rowLevelExpressions.add(e); + } + this.needsReconciliation = filter.needsReconciliation(); } - long numberOfRegularColumnExpressions = rowLevelExpressions.size(); - final boolean filterNonStaticColumns = numberOfRegularColumnExpressions > 0; - - return new Transformation<>() + public int potentialMatches(PartitionUpdate update) { - DecoratedKey pk; - - @Override - protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition) + try (UnfilteredRowIterator partition = update.unfilteredIterator()) { - pk = partition.partitionKey(); - - // Short-circuit all partitions that won't match based on static and partition keys + int matches = 0; for (Expression e : partitionLevelExpressions) - if (!e.isSatisfiedBy(metadata, partition.partitionKey(), partition.staticRow(), nowInSec)) + { + if (e.isSatisfiedBy(metadata, partition.partitionKey(), partition.staticRow(), nowInSec)) + { + matches++; + break; + } + } + + while (partition.hasNext()) + { + Unfiltered unfiltered = partition.next(); + + if (unfiltered instanceof Row) { - partition.close(); - return null; + Row row = (Row) unfiltered; + + for (Expression e : rowLevelExpressions) + { + if (e.isSatisfiedBy(metadata, pk, row, nowInSec)) + { + matches++; + break; + } + } } + } + return matches; + } + } - BaseRowIterator<?> iterator = partition instanceof UnfilteredRowIterator - ? Transformation.apply((UnfilteredRowIterator) partition, this) - : Transformation.apply((RowIterator) partition, this); + @Override + protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition) + { + pk = partition.partitionKey(); - if (filterNonStaticColumns && !iterator.hasNext()) + // Short-circuit all partitions that won't match based on static and partition keys + for (Expression e : partitionLevelExpressions) + { + if (!e.isSatisfiedBy(metadata, partition.partitionKey(), partition.staticRow(), nowInSec)) { - iterator.close(); + partition.close(); return null; } + } + + BaseRowIterator<?> iterator = partition instanceof UnfilteredRowIterator + ? Transformation.apply((UnfilteredRowIterator) partition, this) + : Transformation.apply((RowIterator) partition, this); - return iterator; + boolean filterNonStaticColumns = !rowLevelExpressions.isEmpty(); + if (filterNonStaticColumns && !iterator.hasNext()) + { + iterator.close(); + return null; } + return iterator; + } + @Override public Row applyToRow(Row row) { // If we purge deletions when reconciliation is required, we hide information replica filtering // protection would require to filter rows that are no longer matches are the coordinator. - Row purged = needsReconciliation() ? row : row.purge(DeletionPurger.PURGE_ALL, nowInSec, metadata.enforceStrictLiveness()); + Row purged = needsReconciliation ? row : row.purge(DeletionPurger.PURGE_ALL, nowInSec, metadata.enforceStrictLiveness()); if (purged == null) return null; - for (Expression e : rowLevelExpressions) - if (!e.isSatisfiedBy(metadata, pk, purged, nowInSec)) - return null; + for (Expression e : rowLevelExpressions) + if (!e.isSatisfiedBy(metadata, pk, purged, nowInSec)) + return null; - return row; - } - }; + return row; + } + } + + /** + * Note that the application of this transformation does not yet take {@link #isStrict()} into account. This means + * that even when strict filtering is not safe, expressions will be applied as intersections rather than unions. + * The filter will always be evaluated strictly in conjunction with replica filtering protection at the + * coordinator, however, even after CASSANDRA-19007 is addressed. + * + * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-190007">CASSANDRA-19007</a> + */ + public RowFilterTransformation filter(TableMetadata metadata, long nowInSec) + { + return new RowFilterTransformation( this, metadata, nowInSec); } /** diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java index 569cee34d4..866f5c0e51 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java @@ -19,11 +19,13 @@ package org.apache.cassandra.db.partitions; import java.io.IOError; import java.io.IOException; +import java.util.Comparator; import java.util.List; import org.apache.cassandra.db.EmptyIterators; import org.apache.cassandra.db.SinglePartitionReadQuery; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.BaseRowIterator; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.rows.RowIterators; import org.apache.cassandra.db.transform.MorePartitions; @@ -32,6 +34,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.MergeIterator; public abstract class PartitionIterators { @@ -99,6 +102,67 @@ public abstract class PartitionIterators } } + /** + * Merges multiple partition iterators with the requirement that there are no keys in common between any + * of the iterators + */ + public static PartitionIterator mergeNonOverlapping(List<PartitionIterator> iterators) + { + MergeIterator.Reducer<RowIterator, RowIterator> reducer = new MergeIterator.Reducer<>() + { + RowIterator current; + + @Override + protected void onKeyChange() + { + current = null; + } + + @Override + public void reduce(int idx, RowIterator partition) + { + if (current != null) + { + throw new IllegalStateException("Multiple partitions received for " + current.partitionKey()); + } + current = partition; + } + + @Override + protected RowIterator getReduced() + { + return current; + } + }; + + MergeIterator<RowIterator, RowIterator> mergeIterator = MergeIterator.get(iterators, rowIteratorComparator, reducer); + + return new AbstractPartitionIterator() + { + @Override + protected RowIterator computeNext() + { + return mergeIterator.hasNext() ? mergeIterator.next() : endOfData(); + } + }; + } + private static final Comparator<RowIterator> rowIteratorComparator = Comparator.comparing(BaseRowIterator::partitionKey); + + /** + * Consumes all rows in the next partition of the provided partition iterator. + */ + public static void consumeNext(PartitionIterator iterator) + { + if (iterator.hasNext()) + { + try (RowIterator partition = iterator.next()) + { + while (partition.hasNext()) + partition.next(); + } + } + } + /** * Wraps the provided iterator so it logs the returned rows for debugging purposes. * <p> @@ -209,5 +273,5 @@ public abstract class PartitionIterators } }; } - }; + } } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java b/src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java index 0cbb3be38d..2819068a43 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java @@ -18,8 +18,6 @@ package org.apache.cassandra.service.reads.tracked; -import java.util.List; - import com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -30,34 +28,176 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.db.transform.RTBoundValidator; -import org.apache.cassandra.index.Index; -import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP; public abstract class AbstractPartialTrackedRead implements PartialTrackedRead { private static final Logger logger = LoggerFactory.getLogger(AbstractPartialTrackedRead.class); - private enum State + protected interface Augmentable + { + State augment(PartitionUpdate update); + } + + protected static abstract class State + { + protected static final State CLOSED = new State() + { + @Override + String name() + { + return "closed"; + } + + @Override + boolean isClosed() + { + return true; + } + }; + + abstract String name(); + + boolean isInitialized() + { + return false; + } + + Initialized asInitialized() + { + throw new IllegalStateException("State is " + name() + ", not " + Initialized.NAME); + } + + boolean isPrepared() + { + return false; + } + + Prepared asPrepared() + { + throw new IllegalStateException("State is " + name() + ", not " + Prepared.NAME); + } + + boolean isCompleted() + { + return false; + } + + Completed asCompleted() + { + throw new IllegalStateException("State is " + name() + ", not " + Completed.NAME); + } + + boolean isAugmentable() + { + return isPrepared() || isInitialized(); + } + + Augmentable asAugmentable() + { + if (isPrepared()) return asPrepared(); + throw new IllegalStateException("State is " + name() + ", not augmentable"); + } + + boolean isClosed() + { + return false; + } + + void close() + { + } + } + + // TODO (expected): this is a redundant state, never exposed + protected final class Initialized extends State + { + static final String NAME = "initialized"; + + @Override + String name() + { + return NAME; + } + + @Override + boolean isInitialized() + { + return true; + } + + @Override + Initialized asInitialized() + { + return this; + } + + Prepared prepare(UnfilteredPartitionIterator initialData) + { + return prepareInternal(initialData); + } + } + + protected abstract Prepared prepareInternal(UnfilteredPartitionIterator initialData); + + protected abstract class Prepared extends State implements Augmentable { - INITIALIZED, - PREPARED, - READING, - FINISHED + private static final String NAME = "prepared"; + + @Override + String name() + { + return NAME; + } + + @Override + boolean isPrepared() + { + return true; + } + + @Override + Prepared asPrepared() + { + return this; + } + + abstract Completed complete(); + + } + + protected abstract class Completed extends State + { + private static final String NAME = "completed"; + + @Override + String name() + { + return NAME; + } + + protected abstract UnfilteredPartitionIterator iterator(); + protected abstract CompletedRead createResult(UnfilteredPartitionIterator iterator); + + protected CompletedRead getResult() + { + UnfilteredPartitionIterator result = command().completeTrackedRead(iterator(), AbstractPartialTrackedRead.this); + // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both + // ends equal, and there are no dangling RT bound in any partition. + result = RTBoundValidator.validate(result, RTBoundValidator.Stage.PROCESSED, true); + return createResult(result); + } } final ReadExecutionController executionController; - final Index.Searcher searcher; final ColumnFamilyStore cfs; final long startTimeNanos; - volatile State state = State.INITIALIZED; + private State state = new Initialized(); - public AbstractPartialTrackedRead(ReadExecutionController executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long startTimeNanos) + public AbstractPartialTrackedRead(ReadExecutionController executionController, ColumnFamilyStore cfs, long startTimeNanos) { this.executionController = executionController; - this.searcher = searcher; this.cfs = cfs; this.startTimeNanos = startTimeNanos; } @@ -68,12 +208,6 @@ public abstract class AbstractPartialTrackedRead implements PartialTrackedRead return executionController; } - @Override - public Index.Searcher searcher() - { - return searcher; - } - @Override public ColumnFamilyStore cfs() { @@ -86,69 +220,47 @@ public abstract class AbstractPartialTrackedRead implements PartialTrackedRead return startTimeNanos; } - abstract void freezeInitialData(); - - abstract UnfilteredPartitionIterator initialData(); - - abstract UnfilteredPartitionIterator augmentedData(); - - abstract void augmentResponse(PartitionUpdate update); + protected synchronized State state() + { + return state; + } /** * Implementors need to call this before returning this from createInProgressRead + * TODO (expected): this is a redundant transition from a redundant state (INITIALIZED) */ - synchronized void prepare() + synchronized void prepare(UnfilteredPartitionIterator initialData) { logger.trace("Preparing read {}", this); - Preconditions.checkState(state == State.INITIALIZED); - freezeInitialData(); - state = State.PREPARED; + state = state.asInitialized().prepare(initialData); } @Override - public void augment(Mutation mutation) + public synchronized void augment(Mutation mutation) { - Preconditions.checkState(state == State.PREPARED); PartitionUpdate update = mutation.getPartitionUpdate(command().metadata()); if (update != null) - augmentResponse(update); + state = state.asAugmentable().augment(update); } - private UnfilteredPartitionIterator complete(UnfilteredPartitionIterator iterator) - { - return command().completeTrackedRead(iterator, this); - } - - abstract CompletedRead createResult(UnfilteredPartitionIterator iterator); - @Override public synchronized CompletedRead complete() { - Preconditions.checkState(state == State.PREPARED); - state = State.READING; - - UnfilteredPartitionIterator initial = initialData(); - UnfilteredPartitionIterator augmented = augmentedData(); - - UnfilteredPartitionIterator result = augmented != null ? - UnfilteredPartitionIterators.merge(List.of(initial, augmented), NOOP) : - initial; - - result = complete(result); - // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both - // ends equal, and there are no dangling RT bound in any partition. - result = RTBoundValidator.validate(result, RTBoundValidator.Stage.PROCESSED, true); - return createResult(complete(result)); + Preconditions.checkState(state.isPrepared()); + Completed completed = state.asPrepared().complete(); + state = completed; + return completed.getResult(); } @Override public synchronized void close() { - if (state == State.FINISHED) + if (state.isClosed()) return; logger.trace("Closing read {}", this); + state.close(); executionController.close(); - state = State.FINISHED; + state = State.CLOSED; } } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ExtendingCompletedRead.java b/src/java/org/apache/cassandra/service/reads/tracked/ExtendingCompletedRead.java new file mode 100644 index 0000000000..f5fa4dc25a --- /dev/null +++ b/src/java/org/apache/cassandra/service/reads/tracked/ExtendingCompletedRead.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.tracked; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; + +class ExtendingCompletedRead implements PartialTrackedRead.CompletedRead +{ + private static final Logger logger = LoggerFactory.getLogger(ExtendingCompletedRead.class); + + final PartitionRangeReadCommand command; + final UnfilteredPartitionIterator iterator; + // merged end-result counter + final DataLimits.Counter mergedResultCounter; + + private final boolean partitionsFetched; + private final boolean initialIteratorExhausted; + protected final AbstractBounds<PartitionPosition> followUpBounds; + + public ExtendingCompletedRead(PartitionRangeReadCommand command, + UnfilteredPartitionIterator iterator, + boolean partitionsFetched, + boolean initialIteratorExhausted, + AbstractBounds<PartitionPosition> followUpBounds) + { + this.command = command; + this.iterator = iterator; + mergedResultCounter = command.limits().newCounter(command.nowInSec(), + true, + command.selectsFullPartition(), + command.metadata().enforceStrictLiveness()); + this.partitionsFetched = partitionsFetched; + this.initialIteratorExhausted = initialIteratorExhausted; + this.followUpBounds = followUpBounds; + } + + @Override + public TrackedDataResponse response() + { + PartitionIterator filtered = UnfilteredPartitionIterators.filter(iterator, command.nowInSec()); + PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter); + PartitionIterator result = Transformation.apply(counted, new EmptyPartitionsDiscarder()); + return TrackedDataResponse.create(result, command.columnFilter()); + } + + static boolean followUpReadRequired(ReadCommand command, DataLimits.Counter mergedResultCounter, boolean initialIteratorExhausted, boolean partitionsFetched) + { + // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit + if (mergedResultCounter.isDone()) + return false; + + // we do not apply short read protection when we have no limits at all + if (command.limits().isUnlimited()) + return false; + + /* + * If this is a single partition read command or an (indexed) partition range read command with + * a partition key specified, then we can't and shouldn't try fetch more partitions. + */ + if (command.isLimitedToOnePartition()) + return false; + + /* + * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more. + * + * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false + * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911). + */ + if (initialIteratorExhausted && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) + return false; + + /* + * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator. + * There is no point to ask the replica for more rows - it has no more in the requested range. + */ + if (!partitionsFetched) + return false; + + return true; + } + + protected boolean followUpRequired() + { + return followUpReadRequired(command, mergedResultCounter, initialIteratorExhausted, partitionsFetched); + } + + static int toQuery(ReadCommand command, DataLimits.Counter mergedResultCounter) + { + /* + * We are going to fetch one partition at a time for thrift and potentially more for CQL. + * The row limit will either be set to the per partition limit - if the command has no total row limit set, or + * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions, + * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones. + */ + return command.limits().count() != DataLimits.NO_LIMIT + ? command.limits().count() - mergedResultCounter.rowsCounted() + : command.limits().perPartitionCount(); + } + + @Override + public Future<TrackedDataResponse> followupRead(TrackedDataResponse initialResponse, ConsistencyLevel consistencyLevel, long expiresAtNanos) + { + if (!followUpRequired()) + return null; + + + /* + * We are going to fetch one partition at a time for thrift and potentially more for CQL. + * The row limit will either be set to the per partition limit - if the command has no total row limit set, or + * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions, + * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones. + */ + int toQuery = toQuery(command, mergedResultCounter); + + ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark(); + Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, FBUtilities.getBroadcastAddressAndPort()); + logger.info("Requesting {} extra rows from {} for short read protection", toQuery, FBUtilities.getBroadcastAddressAndPort()); + + return makeFollowupRead(initialResponse, toQuery, consistencyLevel, expiresAtNanos); + } + + protected Future<TrackedDataResponse> makeFollowupRead(TrackedDataResponse initialResponse, int toQuery, ConsistencyLevel consistencyLevel, long expiresAtNanos) + { + TrackedRead.Range followUpRead = PartialTrackedRangeRead.makeFollowUpRead(command, followUpBounds, toQuery, consistencyLevel, expiresAtNanos); + followUpRead.start(expiresAtNanos); + AsyncPromise<TrackedDataResponse> combinedRead = new AsyncPromise<>(); + followUpRead.future().addCallback((result, failure) -> { + if (failure != null) + { + combinedRead.tryFailure(failure); + return; + } + + try + { + combinedRead.trySuccess(TrackedDataResponse.merge(initialResponse, result)); + } + catch (Throwable t) + { + combinedRead.tryFailure(t); + } + }); + + return combinedRead; + } + + @Override + public void close() + { + iterator.close(); + } +} diff --git a/src/java/org/apache/cassandra/service/reads/tracked/FilteredFollowupRead.java b/src/java/org/apache/cassandra/service/reads/tracked/FilteredFollowupRead.java new file mode 100644 index 0000000000..121ce1a581 --- /dev/null +++ b/src/java/org/apache/cassandra/service/reads/tracked/FilteredFollowupRead.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.tracked; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.SortedMap; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.service.reads.tracked.PartialTrackedRangeRead.FollowUpReadInfo; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.FutureCombiner; + +import static org.apache.cassandra.service.reads.tracked.ExtendingCompletedRead.followUpReadRequired; +import static org.apache.cassandra.service.reads.tracked.ExtendingCompletedRead.toQuery; +import static org.apache.cassandra.service.reads.tracked.PartialTrackedRangeRead.makeFollowUpRead; + +class FilteredFollowupRead extends AsyncPromise<TrackedDataResponse> +{ + private final TrackedDataResponse initialResponse; + private final int toQuery; + private final ConsistencyLevel consistencyLevel; + private final long expiresAtNanos; + private final SortedMap<DecoratedKey, FollowUpReadInfo> followUpReadInfo; + private final PartitionRangeReadCommand command; + private final AbstractBounds<PartitionPosition> followUpBounds; + private final DecoratedKey finalKey; + + public FilteredFollowupRead(TrackedDataResponse initialResponse, + int toQuery, + ConsistencyLevel consistencyLevel, + long expiresAtNanos, + SortedMap<DecoratedKey, FollowUpReadInfo> followUpReadInfo, + PartitionRangeReadCommand command, + AbstractBounds<PartitionPosition> followUpBounds, + DecoratedKey finalKey) + { + this.initialResponse = initialResponse; + this.toQuery = toQuery; + this.consistencyLevel = consistencyLevel; + this.expiresAtNanos = expiresAtNanos; + this.followUpReadInfo = followUpReadInfo; + this.command = command; + this.followUpBounds = followUpBounds; + this.finalKey = finalKey; + } + + private boolean interleavesWithOriginal(DecoratedKey key) + { + if (finalKey == null) + return false; + return key.compareTo(finalKey) < 0; + } + + public void start() + { + ClusterMetadata metadata = ClusterMetadata.current(); + List<Future<TrackedDataResponse>> futures = new ArrayList<>(); + + int remaining = toQuery; + PeekingIterator<DecoratedKey> followUpKeys = Iterators.peekingIterator(followUpReadInfo.keySet().iterator()); + // query all keys that interleave with the range of keys from the original range read + while (followUpKeys.hasNext() && (remaining > 0 || interleavesWithOriginal(followUpKeys.peek()))) + { + DecoratedKey key = followUpKeys.next(); + FollowUpReadInfo info = followUpReadInfo.get(key); + remaining -= info.potentialMatches; + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fromRangeRead(key, command, command.limits().forShortReadRetry(toQuery)); + TrackedRead.Partition read = TrackedRead.Partition.create(metadata, cmd, consistencyLevel); + read.start(expiresAtNanos); + futures.add(read.future()); + } + + SortedMap<DecoratedKey, FollowUpReadInfo> nextKeys = followUpKeys.hasNext() ? followUpReadInfo.tailMap(followUpKeys.next()) : Collections.emptySortedMap(); + + AtomicReference<PartialTrackedRead> partialRead; + if (remaining > 0) + { + partialRead = new AtomicReference<>(); + TrackedRead.Range rangeRead = makeFollowUpRead(command, followUpBounds, remaining, consistencyLevel, expiresAtNanos); + rangeRead.startLocal(expiresAtNanos, partialRead::set); + futures.add(rangeRead.future()); + } + else + { + partialRead = null; + } + + FutureCombiner.allOf(futures).addCallback((responses, error) -> { + if (error != null) + { + tryFailure(error); + return; + } + + try + { + List<TrackedDataResponse> allResponses = new ArrayList<>(responses); + allResponses.add(initialResponse); + TrackedDataResponse merged = TrackedDataResponse.merge(allResponses); + DataLimits.Counter mergedResultCounter = command.limits().newCounter(command.nowInSec(), + true, + command.selectsFullPartition(), + command.metadata().enforceStrictLiveness()); + + boolean partitionsFetched; + boolean initialIteratorExhausted; + TrackedDataResponse response; + try (PartitionIterator iterator = merged.makeIteratorUnlimited(command)) + { + partitionsFetched = iterator.hasNext(); + response = TrackedDataResponse.create(mergedResultCounter.applyTo(iterator), command.columnFilter()); + initialIteratorExhausted = iterator.hasNext(); + } + + // although we check for interleaved keys in the initial read, we always query for them in the follow up, so + // we just use normal short read protection checks here + if (followUpReadRequired(command, mergedResultCounter, initialIteratorExhausted, partitionsFetched)) + { + AbstractBounds<PartitionPosition> nextBounds = this.followUpBounds; + if (partialRead != null) + { + PartialTrackedRangeRead followUpRangeRead = (PartialTrackedRangeRead) partialRead.get(); + nextBounds = followUpRangeRead.followUpBounds(); + } + FilteredFollowupRead followUp = new FilteredFollowupRead(response, toQuery(command, mergedResultCounter), consistencyLevel, expiresAtNanos, nextKeys, command, nextBounds, null); + followUp.start(); + followUp.addCallback((result, failure) -> { + if (failure != null) + tryFailure(failure); + else + trySuccess(result); + }); + } + else + { + trySuccess(response); + } + } + catch (Throwable t) + { + tryFailure(t); + } + + }); + } +} diff --git a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRangeRead.java b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRangeRead.java index 4f50442950..1a6c91db4f 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRangeRead.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRangeRead.java @@ -18,7 +18,9 @@ package org.apache.cassandra.service.reads.tracked; +import java.util.HashSet; import java.util.Iterator; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -36,15 +38,13 @@ import org.apache.cassandra.db.PartitionRangeReadCommand; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.partitions.AbstractBTreePartition; import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; -import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.SimpleBTreePartition; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.db.rows.UnfilteredRowIterator; -import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.ExcludingBounds; @@ -54,39 +54,45 @@ import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.locator.ReplicaPlans; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.Future; -public class PartialTrackedRangeRead extends AbstractPartialTrackedRead +public abstract class PartialTrackedRangeRead extends AbstractPartialTrackedRead { private static final Logger logger = LoggerFactory.getLogger(PartialTrackedRangeRead.class); - private final PartitionRangeReadCommand command; - private final SortedMap<DecoratedKey, SimpleBTreePartition> data = new TreeMap<>(); - private final UnfilteredPartitionIterator initialData; - private final boolean enforceStrictLiveness; + protected static class FollowUpReadInfo + { + int potentialMatches = 0; + } - // short read support - private DecoratedKey lastPartitionKey; // key of the last observed partition - private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call - private boolean initialIteratorExhausted; - private boolean wasAugmented; - AbstractBounds<PartitionPosition> followUpBounds; + protected final PartitionRangeReadCommand command; - public PartialTrackedRangeRead(ReadExecutionController executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long startTimeNanos, PartitionRangeReadCommand command, UnfilteredPartitionIterator initialData) + private PartialTrackedRangeRead(ReadExecutionController executionController, ColumnFamilyStore cfs, long startTimeNanos, PartitionRangeReadCommand command) { - super(executionController, searcher, cfs, startTimeNanos); + super(executionController, cfs, startTimeNanos); this.command = command; - this.initialData = initialData; - this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); } public static PartialTrackedRangeRead create(ReadExecutionController executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long startTimeNanos, PartitionRangeReadCommand command, UnfilteredPartitionIterator initialData) { - PartialTrackedRangeRead read = new PartialTrackedRangeRead(executionController, searcher, cfs, startTimeNanos, command, initialData); + RowFilter rowFilter = command.rowFilter(); + PartialTrackedRangeRead read; + if (searcher != null) + { + throw new UnsupportedOperationException("TODO: CASSANDRA-20374"); + } + else if (!rowFilter.isEmpty()) + { + read = new PartialTrackedRangeRead.Filtered(executionController, cfs, startTimeNanos, command); + } + else + { + read = new PartialTrackedRangeRead.Simple(executionController, cfs, startTimeNanos, command); + } + try { - read.prepare(); + read.prepare(initialData); return read; } catch (Throwable e) @@ -102,6 +108,227 @@ public class PartialTrackedRangeRead extends AbstractPartialTrackedRead return command; } + protected static class ShortReadSupport + { + final DecoratedKey lastPartitionKey; // key of the last observed partition + final boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call + final boolean initialIteratorExhausted; + final AbstractBounds<PartitionPosition> followUpBounds; + boolean wasAugmented; + + ShortReadSupport(Builder builder, boolean initialIteratorExhausted, AbstractBounds<PartitionPosition> followUpBounds) + { + this.lastPartitionKey = builder.lastPartitionKey; + this.partitionsFetched = builder.partitionsFetched; + this.initialIteratorExhausted = initialIteratorExhausted; + this.followUpBounds = followUpBounds; + this.wasAugmented = false; + } + + protected static class Builder + { + final ReadCommand command; + final DataLimits.Counter counter; + DecoratedKey lastPartitionKey; // key of the last observed partition + boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call + + protected Builder(ReadCommand command) + { + this.command = command; + counter = command.limits().newCounter(command.nowInSec(), + false, + command.selectsFullPartition(), + command.metadata().enforceStrictLiveness()); + } + + ShortReadSupport build() + { + boolean initialIteratorExhausted = command.limits().isExhausted(counter); + AbstractBounds<PartitionPosition> followUpBounds = null; + if (partitionsFetched) + { + AbstractBounds<PartitionPosition> bounds = command.dataRange().keyRange(); + followUpBounds = bounds.inclusiveRight() + ? new Range<>(lastPartitionKey, bounds.right) + : new ExcludingBounds<>(lastPartitionKey, bounds.right); + Preconditions.checkState(!followUpBounds.contains(lastPartitionKey)); + } + return new ShortReadSupport(this, initialIteratorExhausted, followUpBounds); + } + } + } + + private abstract class Materializer extends Transformation<UnfilteredRowIterator> + { + final SortedMap<DecoratedKey, SimpleBTreePartition> data = new TreeMap<>(); + final ShortReadSupport.Builder shortReadSupport; + + private Materializer(ReadCommand command) + { + this.shortReadSupport = new ShortReadSupport.Builder(command); + } + + abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iterator); + + abstract RangePrepared createRangePrepared(); + + RangePrepared materialize(UnfilteredPartitionIterator inputIterator) + { + try + { + UnfilteredPartitionIterator materialized = Transformation.apply(inputIterator, new Transformation<UnfilteredRowIterator>() + { + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + SimpleBTreePartition materialized = data.computeIfAbsent(partition.partitionKey(), key -> new SimpleBTreePartition(key, partition.metadata(), UpdateTransaction.NO_OP)); + materialized.update(PartitionUpdate.fromIterator(partition, command.columnFilter())); + shortReadSupport.lastPartitionKey = partition.partitionKey(); + shortReadSupport.partitionsFetched = true; + return queryPartition(materialized); + } + }); + + UnfilteredPartitionIterator filtered = filter(materialized); + + try (UnfilteredPartitionIterator iterator = shortReadSupport.counter.applyTo(filtered)) + { + consume(iterator); + } + + return createRangePrepared(); + } + finally + { + inputIterator.close(); + } + } + + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + SimpleBTreePartition materialized = data.computeIfAbsent(partition.partitionKey(), key -> new SimpleBTreePartition(key, partition.metadata(), UpdateTransaction.NO_OP)); + materialized.update(PartitionUpdate.fromIterator(partition, command.columnFilter())); + shortReadSupport.lastPartitionKey = partition.partitionKey(); + shortReadSupport.partitionsFetched = true; + return queryPartition(materialized); + } + } + + protected abstract class RangePrepared extends Prepared + { + protected final SortedMap<DecoratedKey, SimpleBTreePartition> data; + protected final ShortReadSupport shortReadSupport; + protected boolean wasAugmented; + + public RangePrepared(SortedMap<DecoratedKey, SimpleBTreePartition> data, ShortReadSupport shortReadSupport) + { + this.data = data; + this.shortReadSupport = shortReadSupport; + } + + protected boolean canAcceptUpdate(PartitionUpdate update) + { + return shortReadSupport.initialIteratorExhausted || !shortReadSupport.followUpBounds.contains(update.partitionKey()); + } + + private SimpleBTreePartition augmentResponseInternal(PartitionUpdate update) + { + SimpleBTreePartition partition = data.computeIfAbsent(update.partitionKey(), key -> new SimpleBTreePartition(key, update.metadata(), UpdateTransaction.NO_OP)); + partition.update(update); + return partition; + } + + @Override + public State augment(PartitionUpdate update) + { + // if the input iterator reached the row limit, then we can't apply any augmenting mutations that are past + // the last materialized key. Since we wouldn't have materialized the local data for that key, applying an + // update would cause us to return incomplete data for it. + if (canAcceptUpdate(update)) + { + logger.trace("Augmented partition {} for read {}", update.partitionKey(), PartialTrackedRangeRead.this); + augmentResponseInternal(update); + } + else + { + logger.trace("Ignoring unacceptable update from key {} on read {}", update.partitionKey(), PartialTrackedRangeRead.this); + } + wasAugmented = true; + return this; + } + } + + protected abstract class RangeCompleted extends Completed + { + protected final SortedMap<DecoratedKey, SimpleBTreePartition> data; + protected final ShortReadSupport shortReadSupport; + protected final boolean wasAugmented; + + public RangeCompleted(SortedMap<DecoratedKey, SimpleBTreePartition> data, ShortReadSupport shortReadSupport, boolean wasAugmented) + { + this.data = data; + this.shortReadSupport = shortReadSupport; + this.wasAugmented = wasAugmented; + } + + @Override + protected UnfilteredPartitionIterator iterator() + { + Iterator<SimpleBTreePartition> iterator = data.values().iterator(); + return new AbstractUnfilteredPartitionIterator() + { + @Override + public TableMetadata metadata() + { + return command.metadata(); + } + + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } + + @Override + public UnfilteredRowIterator next() + { + return queryPartition(iterator.next()); + } + }; + } + + protected abstract CompletedRead extendRead(UnfilteredPartitionIterator iterator); + + @Override + protected CompletedRead createResult(UnfilteredPartitionIterator iterator) + { + if (wasAugmented) + return extendRead(iterator); + return CompletedRead.simple(iterator, command); + } + + AbstractBounds<PartitionPosition> followUpBounds() + { + return shortReadSupport.followUpBounds; + } + } + + abstract Materializer createMaterializer(); + + @Override + protected Prepared prepareInternal(UnfilteredPartitionIterator initialData) + { + // memtable contents are frozen at read completion time, when the iterator is evaluated, not at the beginning + // of the read, when references to memtables and sstables are collected. Because of this, replica coordinated + // reads can cause read monotonicity to be broken by returning data that hasn't been replicated to at least + // CL other nodes via reconciliation. To prevent this, the contents of the initial iterator are materialized + // onto heap at partition granularity until the limits of the read are reached. + + Materializer materializer = createMaterializer(); + return materializer.materialize(initialData); + } + UnfilteredRowIterator queryPartition(AbstractBTreePartition partition) { return partition.unfilteredIterator(command.columnFilter(), @@ -121,227 +348,264 @@ public class PartialTrackedRangeRead extends AbstractPartialTrackedRead } } - @Override - void freezeInitialData() + public AbstractBounds<PartitionPosition> followUpBounds() { - // memtable contents are frozen at read completion time, when the iterator is evaluated, not at the beginning - // of the read, when references to memtables and sstables are collected. Because of this, replica coordinated - // reads can cause read monotonicity to be broken by returning data that hasn't been replicated to at least - // CL other nodes via reconciliation. To prevent this, the contents of the initial iterator are materialized - // onto heap at partition granularity until the limits of the read are reached. + RangeCompleted completed = (RangeCompleted) state().asCompleted(); + return completed.followUpBounds(); + } + + protected static TrackedRead.Range makeFollowUpRead(PartitionRangeReadCommand command, AbstractBounds<PartitionPosition> followUpBounds, int toQuery, ConsistencyLevel consistencyLevel, long expiresAtNanos) + { + DataLimits newLimits = command.limits().forShortReadRetry(toQuery); + + DataRange newDataRange = command.dataRange().forSubRange(followUpBounds); + + Keyspace keyspace = Keyspace.open(command.metadata().keyspace); + PartitionRangeReadCommand followUpCmd = command.withUpdatedLimitsAndDataRange(newLimits, newDataRange); + ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forRangeRead(keyspace, + followUpCmd.indexQueryPlan(), + consistencyLevel, + followUpCmd.dataRange().keyRange(), + 1); - UnfilteredPartitionIterator materializer = new AbstractUnfilteredPartitionIterator() + TrackedRead.Range read = TrackedRead.Range.create(followUpCmd, replicaPlan); + logger.trace("Short read detected, starting followup read {}", read); + return read; + } + + static class Simple extends PartialTrackedRangeRead + { + private class SimplePrepared extends RangePrepared { - @Override - public TableMetadata metadata() + public SimplePrepared(SortedMap<DecoratedKey, SimpleBTreePartition> data, ShortReadSupport shortReadSupport) { - return initialData.metadata(); + super(data, shortReadSupport); } @Override - public boolean hasNext() + Completed complete() { - return initialData.hasNext(); + return new SimpleCompleted(data, shortReadSupport, wasAugmented); } + } - @Override - public UnfilteredRowIterator next() + protected class SimpleCompleted extends RangeCompleted + { + public SimpleCompleted(SortedMap<DecoratedKey, SimpleBTreePartition> data, ShortReadSupport shortReadSupport, boolean wasAugmented) { - try (UnfilteredRowIterator rowIterator = initialData.next()) - { - SimpleBTreePartition partition = augmentResponseInternal(PartitionUpdate.fromIterator(rowIterator, command.columnFilter())); - lastPartitionKey = partition.partitionKey(); - partitionsFetched = true; - return queryPartition(partition); - } + super(data, shortReadSupport, wasAugmented); } @Override - public void close() + protected CompletedRead extendRead(UnfilteredPartitionIterator iterator) { - super.close(); - initialData.close(); + return new ExtendingCompletedRead(command, iterator, shortReadSupport.partitionsFetched, shortReadSupport.initialIteratorExhausted, shortReadSupport.followUpBounds); } - }; - - // unmerged per-source counter - final DataLimits.Counter singleResultCounter = command.limits().newCounter(command.nowInSec(), - false, - command.selectsFullPartition(), - enforceStrictLiveness); - try (UnfilteredPartitionIterator iterator = singleResultCounter.applyTo(materializer)) + } + + public Simple(ReadExecutionController executionController, ColumnFamilyStore cfs, long startTimeNanos, PartitionRangeReadCommand command) + { + super(executionController, cfs, startTimeNanos, command); + } + + @Override + Materializer createMaterializer() { - consume(iterator); + return new Materializer(command) + { + @Override + UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iterator) + { + Preconditions.checkState(command().rowFilter().isEmpty()); + return iterator; + } + + @Override + RangePrepared createRangePrepared() + { + return new SimplePrepared(data, shortReadSupport.build()); + } + }; } - initialIteratorExhausted = command.limits().isExhausted(singleResultCounter); - if (partitionsFetched) + + @Override + public Index.Searcher searcher() { - AbstractBounds<PartitionPosition> bounds = command.dataRange().keyRange(); - followUpBounds = bounds.inclusiveRight() - ? new Range<>(lastPartitionKey, bounds.right) - : new ExcludingBounds<>(lastPartitionKey, bounds.right); - Preconditions.checkState(!followUpBounds.contains(lastPartitionKey)); + return null; } - wasAugmented = false; } - @Override - UnfilteredPartitionIterator initialData() + + /** + * Since ALLOW FILTERING reads can cover a lot of partitions without returning much data, we don't want to eagerly + * materialize partitions onto the heap and keep them there. So this filters out non-matching partitions from the + * freezeInitialData phase. However, if reconciliation receives a mutation that applies to a previously discarded + * partition AND the contents of that mutation matches the row filter, we also need to retry the read against that + * partition so we don't return incomplete data. This class handles both jobs + */ + static class Filtered extends PartialTrackedRangeRead { - Iterator<SimpleBTreePartition> iterator = data.values().iterator(); - return new AbstractUnfilteredPartitionIterator() + + protected class FilteredPrepared extends RangePrepared { - @Override - public TableMetadata metadata() + private final Set<DecoratedKey> filteredKeys; + private final SortedMap<DecoratedKey, FollowUpReadInfo> followUpReadInfo = new TreeMap<>(); + private final RowFilter.RowFilterTransformation filter; + public FilteredPrepared(SortedMap<DecoratedKey, SimpleBTreePartition> data, ShortReadSupport shortReadSupport, Set<DecoratedKey> filteredKeys, RowFilter.RowFilterTransformation filter) { - return command.metadata(); + super(data, shortReadSupport); + this.filteredKeys = filteredKeys; + this.filter = filter; } @Override - public boolean hasNext() + protected boolean canAcceptUpdate(PartitionUpdate update) { - return iterator.hasNext(); + DecoratedKey key = update.partitionKey(); + if (filteredKeys.contains(key)) + { + int matches = filter.potentialMatches(update); + if (matches > 0) + { + FollowUpReadInfo info = followUpReadInfo.computeIfAbsent(key, k -> new FollowUpReadInfo()); + info.potentialMatches += matches; + } + logger.trace("Not applying update for previously filtered partition: {}", update.partitionKey()); + return false; + } + return super.canAcceptUpdate(update); } @Override - public UnfilteredRowIterator next() + Completed complete() { - return queryPartition(iterator.next()); + return new FilteredCompleted(data, shortReadSupport, wasAugmented, followUpReadInfo); } - }; - } - - @Override - UnfilteredPartitionIterator augmentedData() - { - return null; - } - - private SimpleBTreePartition augmentResponseInternal(PartitionUpdate update) - { - SimpleBTreePartition partition = data.computeIfAbsent(update.partitionKey(), key -> new SimpleBTreePartition(key, update.metadata(), UpdateTransaction.NO_OP)); - partition.update(update); - return partition; - } - - @Override - void augmentResponse(PartitionUpdate update) - { - // if the input iterator reached the row limit, then we can't apply any augmenting mutations that are past - // the last materialized key. Since we wouldn't have materialized the local data for that key, applying an - // update would cause us to return incomplete data for it. - if (initialIteratorExhausted || !followUpBounds.contains(update.partitionKey())) - augmentResponseInternal(update); - wasAugmented = true; - } + } - private class ExtendingCompletedRead implements CompletedRead - { + protected class FilteredMaterializer extends Materializer + { + private final Set<DecoratedKey> filteredKeys = new HashSet<>(); + private final RowFilter.RowFilterTransformation filter; + public FilteredMaterializer(ReadCommand command) + { + super(command); + filter = command.rowFilter().filter(command().metadata(), command().nowInSec()); + } - final UnfilteredPartitionIterator iterator; - // merged end-result counter - final DataLimits.Counter mergedResultCounter = command.limits().newCounter(command.nowInSec(), - true, - command.selectsFullPartition(), - enforceStrictLiveness); + @Override + UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iterator) + { + return Transformation.apply(iterator, new Transformation<>() + { + @Override + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + if (Transformation.apply(partition, filter).isEmpty()) + { + DecoratedKey key = partition.partitionKey(); + data.remove(key); + filteredKeys.add(key); + partition.close(); + return null; + } + return partition; + } + }); + } - public ExtendingCompletedRead(UnfilteredPartitionIterator iterator) - { - this.iterator = iterator; + @Override + RangePrepared createRangePrepared() + { + return new FilteredPrepared(data, shortReadSupport.build(), filteredKeys, filter); + } } - @Override - public PartitionIterator iterator() + static class FilteredCompletedRead extends ExtendingCompletedRead { - PartitionIterator filtered = UnfilteredPartitionIterators.filter(iterator, command.nowInSec()); - PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter); - return Transformation.apply(counted, new EmptyPartitionsDiscarder()); - } + private final DecoratedKey lastMatchingKey; + private final SortedMap<DecoratedKey, FollowUpReadInfo> followUpReadInfo; + public FilteredCompletedRead(PartitionRangeReadCommand command, UnfilteredPartitionIterator iterator, ShortReadSupport shortReadSupport, DecoratedKey lastMatchingKey, SortedMap<DecoratedKey, FollowUpReadInfo> followUpReadInfo) + { + super(command, iterator, shortReadSupport.partitionsFetched, shortReadSupport.initialIteratorExhausted, shortReadSupport.followUpBounds); + this.lastMatchingKey = lastMatchingKey; + this.followUpReadInfo = followUpReadInfo; + } - @Override - public TrackedRead<?, ?> followupRead(ConsistencyLevel consistencyLevel, long expiresAtNanos) - { - // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit - if (mergedResultCounter.isDone()) - return null; + /** + * Even if we reached the limit during materialization, if there are keys ahead of the first materialized key + * or interleaved with them, then we need to read them + * @return + */ + private boolean hasInterleavedFollowupKeys() + { + if (followUpReadInfo.isEmpty()) + return false; - // we do not apply short read protection when we have no limits at all - if (command.limits().isUnlimited()) - return null; + if (lastMatchingKey == null) // null means there was no data and therefore no interleaving + return true; - /* - * If this is a single partition read command or an (indexed) partition range read command with - * a partition key specified, then we can't and shouldn't try fetch more partitions. - */ - if (command.isLimitedToOnePartition()) - return null; - - /* - * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more. - * - * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false - * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911). - */ - if (initialIteratorExhausted && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) - return null; + return followUpReadInfo.firstKey().compareTo(lastMatchingKey) < 0; + } - /* - * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator. - * There is no point to ask the replica for more rows - it has no more in the requested range. - */ - if (!partitionsFetched) - return null; - partitionsFetched = false; - - /* - * We are going to fetch one partition at a time for thrift and potentially more for CQL. - * The row limit will either be set to the per partition limit - if the command has no total row limit set, or - * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions, - * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones. - */ - int toQuery = command.limits().count() != DataLimits.NO_LIMIT - ? command.limits().count() - mergedResultCounter.rowsCounted() - : command.limits().perPartitionCount(); + @Override + protected boolean followUpRequired() + { + return hasInterleavedFollowupKeys() || super.followUpRequired(); + } - ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark(); - Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, FBUtilities.getBroadcastAddressAndPort()); - logger.info("Requesting {} extra rows from {} for short read protection", toQuery, FBUtilities.getBroadcastAddressAndPort()); + @Override + protected Future<TrackedDataResponse> makeFollowupRead(TrackedDataResponse initialResponse, int toQuery, ConsistencyLevel consistencyLevel, long expiresAtNanos) + { + if (followUpReadInfo.isEmpty()) + return super.makeFollowupRead(initialResponse, toQuery, consistencyLevel, expiresAtNanos); + + FilteredFollowupRead followupRead = new FilteredFollowupRead(initialResponse, + toQuery, + consistencyLevel, + expiresAtNanos, + followUpReadInfo, + command, + followUpBounds, + lastMatchingKey); + + followupRead.start(); + return followupRead; + } - return makeFollowupRead(toQuery, consistencyLevel, expiresAtNanos); } - private TrackedRead<?, ?> makeFollowupRead(int toQuery, ConsistencyLevel consistencyLevel, long expiresAtNanos) + private class FilteredCompleted extends RangeCompleted { - DataLimits newLimits = command.limits().forShortReadRetry(toQuery); - - DataRange newDataRange = command.dataRange().forSubRange(followUpBounds); + private final SortedMap<DecoratedKey, FollowUpReadInfo> followUpReadInfo; + public FilteredCompleted(SortedMap<DecoratedKey, SimpleBTreePartition> data, ShortReadSupport shortReadSupport, boolean wasAugmented, SortedMap<DecoratedKey, FollowUpReadInfo> followUpReadInfo) + { + super(data, shortReadSupport, wasAugmented); + this.followUpReadInfo = followUpReadInfo; + } - Keyspace keyspace = Keyspace.open(command.metadata().keyspace); - PartitionRangeReadCommand followUpCmd = command.withUpdatedLimitsAndDataRange(newLimits, newDataRange); - ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forRangeRead(keyspace, - followUpCmd.indexQueryPlan(), - consistencyLevel, - followUpCmd.dataRange().keyRange(), - 1); + @Override + protected CompletedRead extendRead(UnfilteredPartitionIterator iterator) + { + return new FilteredCompletedRead(command, iterator, shortReadSupport, data.isEmpty() ? data.lastKey() : null, followUpReadInfo); + } + } - TrackedRead.Range read = TrackedRead.Range.create(followUpCmd, replicaPlan); - logger.trace("Short read detected, starting followup read {}", read); - read.start(expiresAtNanos); - return read; + public Filtered(ReadExecutionController executionController, ColumnFamilyStore cfs, long startTimeNanos, PartitionRangeReadCommand command) + { + super(executionController, cfs, startTimeNanos, command); } @Override - public void close() + Materializer createMaterializer() { - iterator.close(); + return new FilteredMaterializer(command); } - } - @Override - CompletedRead createResult(UnfilteredPartitionIterator iterator) - { - if (wasAugmented) - return new ExtendingCompletedRead(iterator); - return CompletedRead.simple(iterator, command().nowInSec()); + @Override + public Index.Searcher searcher() + { + return null; + } } } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java index daf222239c..99e509ee7f 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java @@ -25,33 +25,46 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.index.Index; +import org.apache.cassandra.utils.concurrent.Future; public interface PartialTrackedRead { interface CompletedRead extends AutoCloseable { - PartitionIterator iterator(); - TrackedRead<?, ?> followupRead(ConsistencyLevel consistencyLevel, long expiresAtNanos); + TrackedDataResponse response(); // must be called from the read stage + Future<TrackedDataResponse> followupRead(TrackedDataResponse initialResponse, ConsistencyLevel consistencyLevel, long expiresAtNanos); @Override void close(); - static CompletedRead simple(UnfilteredPartitionIterator partition, long nowInSec) + static TrackedDataResponse createResponse(UnfilteredPartitionIterator partition, ReadCommand command) + { + PartitionIterator iterator = UnfilteredPartitionIterators.filter(partition, command.nowInSec()); + DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), + false, + command.selectsFullPartition(), + command.metadata().enforceStrictLiveness()).onlyCount(); + return TrackedDataResponse.create(counter.applyTo(iterator), + command.columnFilter()); + } + + static CompletedRead simple(UnfilteredPartitionIterator partition, ReadCommand command) { return new CompletedRead() { @Override - public PartitionIterator iterator() + public TrackedDataResponse response() { - return UnfilteredPartitionIterators.filter(partition, nowInSec); + return createResponse(partition, command); } @Override - public TrackedRead<?, ?> followupRead(ConsistencyLevel consistencyLevel, long expiresAtNanos) + public Future<TrackedDataResponse> followupRead(TrackedDataResponse initialRead, ConsistencyLevel consistencyLevel, long expiresAtNanos) { return null; } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java index 6aa64a2985..e15dba8a5c 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java @@ -18,6 +18,8 @@ package org.apache.cassandra.service.reads.tracked; +import java.util.List; + import com.google.common.base.Preconditions; import org.apache.cassandra.db.ColumnFamilyStore; @@ -29,29 +31,31 @@ import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.SimpleBTreePartition; import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.transactions.UpdateTransaction; +import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP; + public class PartialTrackedSinglePartitionRead extends AbstractPartialTrackedRead { + private final Index.Searcher searcher; private final SinglePartitionReadCommand command; - private final UnfilteredPartitionIterator initialData; - private SimpleBTreePartition augmentedData; - public PartialTrackedSinglePartitionRead(ReadExecutionController executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long startTimeNanos, SinglePartitionReadCommand command, UnfilteredPartitionIterator initialData) + public PartialTrackedSinglePartitionRead(ReadExecutionController executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long startTimeNanos, SinglePartitionReadCommand command) { - super(executionController, searcher, cfs, startTimeNanos); + super(executionController, cfs, startTimeNanos); + this.searcher = searcher; this.command = command; - this.initialData = initialData; } public static PartialTrackedSinglePartitionRead create(ReadExecutionController executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long startTimeNanos, SinglePartitionReadCommand command, UnfilteredPartitionIterator initialData) { - PartialTrackedSinglePartitionRead read = new PartialTrackedSinglePartitionRead(executionController, searcher, cfs, startTimeNanos, command, initialData); + PartialTrackedSinglePartitionRead read = new PartialTrackedSinglePartitionRead(executionController, searcher, cfs, startTimeNanos, command); try { - read.prepare(); + read.prepare(initialData); return read; } catch (Throwable e) @@ -61,48 +65,88 @@ public class PartialTrackedSinglePartitionRead extends AbstractPartialTrackedRea } } - @Override - void freezeInitialData() + private class SinglePartitionPrepared extends Prepared { - // the iterators from queryStorage grabs sstable references and a - // snapshot of the memtable partition so we don't need to do anything here - } + private final UnfilteredPartitionIterator initialData; + private SimpleBTreePartition augmentedData; - @Override - public ReadCommand command() - { - return command; + private SinglePartitionPrepared(UnfilteredPartitionIterator initialData) + { + this.initialData = initialData; + } + + @Override + public State augment(PartitionUpdate update) + { + Preconditions.checkArgument(update.partitionKey().equals(command.partitionKey())); + if (augmentedData == null) + augmentedData = new SimpleBTreePartition(command.partitionKey(), command.metadata(), UpdateTransaction.NO_OP); + + augmentedData.update(update); + return this; + } + + @Override + Completed complete() + { + return new SinglePartitionCompleted(initialData, augmentedData); + } } - @Override - UnfilteredPartitionIterator initialData() + private class SinglePartitionCompleted extends Completed { - return initialData; + private final UnfilteredPartitionIterator initialData; + private final SimpleBTreePartition augmentedData; + + public SinglePartitionCompleted(UnfilteredPartitionIterator initialData, SimpleBTreePartition augmentedData) + { + this.initialData = initialData; + this.augmentedData = augmentedData; + } + + private UnfilteredPartitionIterator augmentedIterator() + { + if (augmentedData == null) + return null; + Slices slices = command.clusteringIndexFilter().getSlices(command.metadata()); + UnfilteredRowIterator augmentedPartition = augmentedData.unfilteredIterator(command.columnFilter(), slices, command.clusteringIndexFilter().isReversed()); + return new SingletonUnfilteredPartitionIterator(augmentedPartition); + } + + @Override + protected UnfilteredPartitionIterator iterator() + { + UnfilteredPartitionIterator augmentedIterator = augmentedIterator(); + if (augmentedIterator == null) + return initialData; + + return UnfilteredPartitionIterators.merge(List.of(initialData, augmentedIterator), NOOP); + } + + @Override + protected CompletedRead createResult(UnfilteredPartitionIterator iterator) + { + return CompletedRead.simple(iterator, command); + } } @Override - UnfilteredPartitionIterator augmentedData() + protected Prepared prepareInternal(UnfilteredPartitionIterator initialData) { - if (augmentedData == null) - return null; - Slices slices = command.clusteringIndexFilter().getSlices(command.metadata()); - UnfilteredRowIterator augmented = augmentedData.unfilteredIterator(command.columnFilter(), slices, command.clusteringIndexFilter().isReversed()); - return new SingletonUnfilteredPartitionIterator(augmented); + return new SinglePartitionPrepared(initialData); } @Override - void augmentResponse(PartitionUpdate update) + public Index.Searcher searcher() { - Preconditions.checkArgument(update.partitionKey().equals(command.partitionKey())); - if (augmentedData == null) - augmentedData = new SimpleBTreePartition(command.partitionKey(), command.metadata(), UpdateTransaction.NO_OP); - - augmentedData.update(update); + return searcher; } + // TODO: delete (almost?) ever + @Override - CompletedRead createResult(UnfilteredPartitionIterator iterator) + public ReadCommand command() { - return CompletedRead.simple(iterator, command().nowInSec()); + return command; } } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java index b55095bf02..01fad19fb5 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java @@ -21,6 +21,7 @@ package org.apache.cassandra.service.reads.tracked; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.PartitionIterators; import org.apache.cassandra.io.IVersionedSerializer; @@ -33,18 +34,63 @@ import org.apache.cassandra.utils.ByteBufferUtil; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import com.google.common.base.Preconditions; public class TrackedDataResponse { private final int serializationVersion; - private final ByteBuffer data; + private final List<ByteBuffer> data; public TrackedDataResponse(int serializationVersion, ByteBuffer data) { + this(serializationVersion, Collections.singletonList(data)); + } + + private TrackedDataResponse(int serializationVersion, List<ByteBuffer> data) + { + Preconditions.checkArgument(!data.isEmpty()); this.serializationVersion = serializationVersion; this.data = data; } + public TrackedDataResponse merge(TrackedDataResponse that) + { + return merge(this, that); + } + + public static TrackedDataResponse merge(TrackedDataResponse l, TrackedDataResponse r) + { + Preconditions.checkArgument(l.serializationVersion == r.serializationVersion); + List<ByteBuffer> newData = new ArrayList<>(l.data.size() + r.data.size()); + newData.addAll(l.data); + newData.addAll(r.data); + return new TrackedDataResponse(l.serializationVersion, newData); + } + + public static TrackedDataResponse merge(List<TrackedDataResponse> responses) + { + Preconditions.checkArgument(!responses.isEmpty()); + + int version = responses.get(0).serializationVersion; + int size = responses.get(0).data.size(); + + for (int i=1,mi=responses.size(); i<mi; i++) + { + Preconditions.checkState(responses.get(i).serializationVersion == version); + size += responses.get(i).data.size(); + } + + List<ByteBuffer> newData = new ArrayList<>(size); + for (int i=0,mi=responses.size(); i<mi; i++) + newData.addAll(responses.get(i).data); + + return new TrackedDataResponse(version, newData); + } + public static TrackedDataResponse create(PartitionIterator iter, ColumnFilter selection) { try (DataOutputBuffer buffer = new DataOutputBuffer()) @@ -59,7 +105,7 @@ public class TrackedDataResponse } } - public PartitionIterator makeIterator(ReadCommand command) + private static PartitionIterator makeIterator(int serializationVersion, ByteBuffer data, ReadCommand command) { try (DataInputBuffer in = new DataInputBuffer(data, true)) { @@ -72,28 +118,55 @@ public class TrackedDataResponse } } - public static final IVersionedSerializer<TrackedDataResponse> serializer = new IVersionedSerializer<TrackedDataResponse>() + public PartitionIterator makeIteratorUnlimited(ReadCommand command) + { + if (data.size() == 1) + return makeIterator(serializationVersion, data.get(0), command); + + List<PartitionIterator> iterators = new ArrayList<>(data.size()); + for (ByteBuffer buffer : data) + iterators.add(makeIterator(serializationVersion, buffer, command)); + return PartitionIterators.mergeNonOverlapping(iterators); + } + + public PartitionIterator makeIterator(ReadCommand command) + { + DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), + true, + command.selectsFullPartition(), + command.metadata().enforceStrictLiveness()); + return counter.applyTo(makeIteratorUnlimited(command)); + } + + public static final IVersionedSerializer<TrackedDataResponse> serializer = new IVersionedSerializer<>() { @Override public void serialize(TrackedDataResponse response, DataOutputPlus out, int version) throws IOException { out.writeInt(response.serializationVersion); - ByteBufferUtil.writeWithVIntLength(response.data, out); + out.writeInt(response.data.size()); + for (ByteBuffer buffer : response.data) + ByteBufferUtil.writeWithVIntLength(buffer, out); } @Override public TrackedDataResponse deserialize(DataInputPlus in, int version) throws IOException { int serializationVersion = in.readInt(); - ByteBuffer data = ByteBufferUtil.readWithVIntLength(in); + int size = in.readInt(); + List<ByteBuffer> data = new ArrayList<>(size); + for (int i = 0; i < size; i++) + data.add(ByteBufferUtil.readWithVIntLength(in)); return new TrackedDataResponse(serializationVersion, data); } @Override public long serializedSize(TrackedDataResponse response, int version) { - return TypeSizes.sizeof(response.serializationVersion) - + ByteBufferUtil.serializedSizeWithVIntLength(response.data); + long size = TypeSizes.sizeof(response.serializationVersion) + TypeSizes.sizeof(response.data.size()); + for (ByteBuffer buffer : response.data) + size += ByteBufferUtil.serializedSizeWithVIntLength(buffer); + return size; } }; } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReadCoordinator.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReadCoordinator.java index 6732adca4b..c62531240a 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReadCoordinator.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReadCoordinator.java @@ -27,8 +27,6 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.partitions.PartitionIterators; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.metrics.ReadRepairMetrics; @@ -45,6 +43,8 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.AbstractFuture; import org.apache.cassandra.utils.concurrent.Accumulator; import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +52,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.Consumer; public class TrackedLocalReadCoordinator { @@ -512,7 +513,7 @@ public class TrackedLocalReadCoordinator }); } - public void startLocalRead(TrackedRead.Id readId, ReadCommand command, ReplicaPlan.AbstractForRead<?, ?> replicaPlan, int[] summaryNodes, long expiresAtNanos) + public void startLocalRead(TrackedRead.Id readId, ReadCommand command, ReplicaPlan.AbstractForRead<?, ?> replicaPlan, int[] summaryNodes, long expiresAtNanos, Consumer<PartialTrackedRead> partialReadConsumer) { synchronized (this) { @@ -528,6 +529,8 @@ public class TrackedLocalReadCoordinator try { read = command.beginTrackedRead(controller); + if (partialReadConsumer != null) + partialReadConsumer.accept(read); // Create another summary once initial data has been read fully. We do this to catch // any mutations that may have arrived during initial read execution. secondarySummary = command.createMutationSummary(true); @@ -555,20 +558,17 @@ public class TrackedLocalReadCoordinator { try (PartialTrackedRead.CompletedRead completedRead = read.complete()) { - TrackedDataResponse response = TrackedDataResponse.create(completedRead.iterator(), selection); - TrackedRead<?, ?> followUp = completedRead.followupRead(consistencyLevel, expiresAtNanos); + TrackedDataResponse response = completedRead.response(); + Future<TrackedDataResponse> followUp = completedRead.followupRead(response, consistencyLevel, expiresAtNanos); if (followUp != null) { - ReadCommand command = read.command(); - followUp.future().addCallback((iterator, error) -> { + followUp.addCallback((newResponse, error) -> { if (error != null) { promise.tryFailure(error); return; } - PartitionIterator previous = response.makeIterator(command); - TrackedDataResponse newResponse = TrackedDataResponse.create(PartitionIterators.concat(List.of(previous, iterator)), selection); promise.trySuccess(newResponse); }); } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java index 5c2417e6b5..d356ce2efa 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -89,7 +90,7 @@ public class TrackedLocalReads implements Shutdownable getOrCreate(summary.readId()).receiveSummary(from, summary.summary()); } - public TrackedLocalReadCoordinator beginRead(TrackedRead.Id readId, ClusterMetadata metadata, ReadCommand command, ConsistencyLevel consistencyLevel, int[] summaryNodes, long expiresAtNanos) + public TrackedLocalReadCoordinator beginRead(TrackedRead.Id readId, ClusterMetadata metadata, ReadCommand command, ConsistencyLevel consistencyLevel, int[] summaryNodes, long expiresAtNanos, Consumer<PartialTrackedRead> partialReadConsumer) { Keyspace keyspace = Keyspace.open(command.metadata().keyspace); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id); @@ -116,7 +117,7 @@ public class TrackedLocalReads implements Shutdownable // TODO: confirm all summaryNodes are present in the replica plan TrackedLocalReadCoordinator coordinator = getOrCreate(readId); - coordinator.startLocalRead(readId, command, replicaPlan, summaryNodes, expiresAtNanos); + coordinator.startLocalRead(readId, command, replicaPlan, summaryNodes, expiresAtNanos, partialReadConsumer); return coordinator; } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java index 9109b69c01..0d029a554c 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java @@ -53,6 +53,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,7 +134,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan. } } - private final AsyncPromise<PartitionIterator> future = new AsyncPromise<>(); + private final AsyncPromise<TrackedDataResponse> future = new AsyncPromise<>(); private final Id readId = Id.nextId(); private final ReadCommand command; @@ -172,6 +173,11 @@ public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan. protected abstract Verb verb(); + public boolean intersects(DecoratedKey key) + { + return command.dataRange().contains(key); + } + public static class Partition extends TrackedRead<EndpointsForToken, ReplicaPlan.ForTokenRead> { private Partition(SinglePartitionReadCommand command, ReplicaPlan.AbstractForRead<EndpointsForToken, ReplicaPlan.ForTokenRead> replicaPlan, ConsistencyLevel consistencyLevel) @@ -233,7 +239,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan. return hostids; } - public void start(long expiresAt) + private void start(long expiresAt, Consumer<PartialTrackedRead> partialReadConsumer) { // TODO: skip local coordination if this node knows its recovering from an outage // TODO: read speculation @@ -256,8 +262,9 @@ public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan. if (dataNode == localReplica) { + logger.trace("Locally coordinating {}", readId); Stage.READ.submit(() -> { - TrackedLocalReadCoordinator coordinator = MutationTrackingService.instance.localReads().beginRead(readId, ClusterMetadata.current(), command, consistencyLevel, summaryHostIds, expiresAt); + TrackedLocalReadCoordinator coordinator = MutationTrackingService.instance.localReads().beginRead(readId, ClusterMetadata.current(), command, consistencyLevel, summaryHostIds, expiresAt, partialReadConsumer); coordinator.addCallback((response, error) -> { if (error != null) { @@ -272,6 +279,8 @@ public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan. } else { + logger.trace("Sending data request for {} to {}", readId, dataNode.endpoint()); + Preconditions.checkArgument(partialReadConsumer == null, "Cannot supply read consumer for nonlocal reads"); DataRequest dataRequest = new DataRequest(readId, command, consistencyLevel, summaryHostIds); Message<DataRequest> dataMessage = Message.outWithFlag(verb(), dataRequest, MessageFlag.CALL_BACK_ON_FAILURE); MessagingService.instance().sendWithCallback(dataMessage, dataNode.endpoint(), this); @@ -289,15 +298,27 @@ public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan. { if (localReplica == replica) { + logger.trace("Locally processing summary request for {}", readId); Stage.READ.submit(() -> summaryRequest.executeLocally(summaryMessage, ClusterMetadata.current())); } else { + logger.trace("Sending summary request for {} to {}", readId, replica.endpoint()); MessagingService.instance().send(summaryMessage, replica.endpoint()); } } } + public void start(long expiresAt) + { + start(expiresAt, null); + } + + public void startLocal(long expiresAt, Consumer<PartialTrackedRead> partialReadConsumer) + { + start(expiresAt, partialReadConsumer); + } + public void start(Dispatcher.RequestTime requestTime) { start(requestTime.computeDeadline(verb().expiresAfterNanos())); @@ -305,7 +326,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan. private void onResponse(TrackedDataResponse response) { - future.trySuccess(response.makeIterator(command)); + future.trySuccess(response); } @Override @@ -326,7 +347,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan. return true; } - public Future<PartitionIterator> future() + public Future<TrackedDataResponse> future() { return future; } @@ -335,7 +356,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan. { try { - return future.get(command.getTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + return future.get(command.getTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS).makeIterator(command); } catch (InterruptedException e) { @@ -413,7 +434,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P extends ReplicaPlan. @Override public void executeLocally(Message<? extends Request> message, ClusterMetadata metadata) { - TrackedLocalReadCoordinator coordinator = MutationTrackingService.instance.localReads().beginRead(readId, metadata, command, consistencyLevel, summaryNodes, message.expiresAtNanos()); + TrackedLocalReadCoordinator coordinator = MutationTrackingService.instance.localReads().beginRead(readId, metadata, command, consistencyLevel, summaryNodes, message.expiresAtNanos(), null); coordinator.addCallback((response, error) -> { if (error != null) { diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml index 22d2e9faa4..ee79d68c87 100644 --- a/test/conf/logback-dtest.xml +++ b/test/conf/logback-dtest.xml @@ -45,9 +45,9 @@ <encoder> <pattern>%-5level [%thread] ${instance_id} %date{"yyyy-MM-dd'T'HH:mm:ss,SSS", UTC} %F:%L - %msg%n</pattern> </encoder> - <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> - <level>DEBUG</level> - </filter> +<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">--> +<!-- <level>DEBUG</level>--> +<!-- </filter>--> </appender> <logger name="io.netty.*" level="WARN"/> @@ -63,4 +63,6 @@ <appender-ref ref="INSTANCESTDERR" /> <appender-ref ref="INSTANCESTDOUT" /> </root> + <logger name="org.apache.cassandra.replication" level="TRACE"/> + <logger name="org.apache.cassandra.service.reads.tracked" level="TRACE"/> </configuration> diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairRangeQueriesTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairRangeQueriesTest.java index b2af1f938e..3ad81cdc9e 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairRangeQueriesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairRangeQueriesTest.java @@ -18,9 +18,15 @@ package org.apache.cassandra.distributed.test; +import java.util.Arrays; +import java.util.Comparator; + +import org.junit.Assume; import org.junit.Test; -import org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.distributed.shared.AssertUtils.row; @@ -156,7 +162,6 @@ public class ReadRepairRangeQueriesTest extends ReadRepairQueryTester @Test public void testRangeQueryWithFilterOnSelectedColumnOnSkinnyTable() { - MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "uses ALLOW FILTERING (CASSANDRA-20555)"); tester("WHERE a=2 ALLOW FILTERING") .createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int)") .mutate("INSERT INTO %s (k, a, b) VALUES (1, 2, 3)", @@ -183,7 +188,6 @@ public class ReadRepairRangeQueriesTest extends ReadRepairQueryTester @Test public void testRangeQueryWithFilterOnSelectedColumnOnWideTable() { - MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "uses ALLOW FILTERING (CASSANDRA-20555)"); tester("WHERE a=1 ALLOW FILTERING") .createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY KEY(k, c))") .mutate("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)", @@ -215,8 +219,6 @@ public class ReadRepairRangeQueriesTest extends ReadRepairQueryTester @Test public void testRangeQueryWithFilterOnUnselectedColumnOnSkinnyTable() { - MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "uses ALLOW FILTERING (CASSANDRA-20555)"); - tester("WHERE b=3 ALLOW FILTERING") .createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int)") .mutate("INSERT INTO %s (k, a, b) VALUES (1, 2, 3)", @@ -243,8 +245,6 @@ public class ReadRepairRangeQueriesTest extends ReadRepairQueryTester @Test public void testRangeQueryWithFilterOnUnselectedColumnOnWideTable() { - if (coordinator == 2) - MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "Depends on ALLOW FILTERING"); tester("WHERE b=2 ALLOW FILTERING") .createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY KEY(k, c))") .mutate("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)", @@ -269,4 +269,113 @@ public class ReadRepairRangeQueriesTest extends ReadRepairQueryTester rows(row(2, 1, 1, 1)), rows()); } + + /** + * Test range queries using filtering on an selected column on a table with clustering columns. + */ + @Test + public void testRangeQueryWithFilterOnSelectedColumnConflictingUpdates() + { + Assume.assumeTrue(replicationType.isTracked()); // flaky for untracked replication + tester("WHERE a=1 ALLOW FILTERING") + .createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY KEY(k, c))") + .mutate(1, "INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)", + "INSERT INTO %s (k, c, a, b) VALUES (1, 2, 2, 2)", + "INSERT INTO %s (k, c, a, b) VALUES (2, 1, 2, 1)", + "INSERT INTO %s (k, c, a, b) VALUES (2, 2, 2, 2)") + .mutate(2, "INSERT INTO %s (k, c, a, b) VALUES (2, 1, 1, 1)") + .mutate(1) // updates the last coordinator for mutation tracking + .queryColumns("k, c, a", 2, 2, + rows(row(1, 1, 1), row(2, 1, 1)), + rows(row(1, 1, 1, 1), row(2, 1, 1, 1)), + rows(row(1, 1, 1, null), row(2, 1, 1, 1))) + .tearDown(2, + rows(row(1, 1, 1, 1), row(1, 2, 2, 2), row(2, 1, 1, 1), row(2, 2, 2, 2)), + rows(row(1, 1, 1, 1), row(2, 1, 1, 1))); + } + + /** + * Helper class for creating rows in token sorted order + */ + private static class TokenSortedKey + { + final int key; + + public TokenSortedKey(int key) + { + this.key = key; + } + + Token token() + { + return Murmur3Partitioner.instance.getToken(ByteBufferUtil.bytes(key)); + } + + public Object[] row(Object... values) + { + Object[] row = new Object[values.length + 1]; + row[0] = key; + System.arraycopy(values, 0, row, 1, values.length); + return row; + } + + public static TokenSortedKey[] create(int numKeys) + { + TokenSortedKey[] keys = new TokenSortedKey[numKeys]; + for (int i = 0; i < numKeys; i++) + keys[i] = new TokenSortedKey(i); + Arrays.sort(keys, Comparator.comparing(TokenSortedKey::token)); + return keys; + } + + } + + /** + * Tests the allow filtering case where the local coordinator filters out a partition that hashes to a token + * between 2 valid matches and another replica reports a mutation that causes it to be re-added to the result + */ + @Test + public void testFilteredInterleavedShortRead() + { + Assume.assumeTrue(replicationType.isTracked()); // flaky for untracked replication + TokenSortedKey[] keys = TokenSortedKey.create(4); + + tester("WHERE a=1 ALLOW FILTERING") + .createTable("CREATE TABLE %s (k int, c int, a int, PRIMARY KEY(k, c))") + .mutate(1, "INSERT INTO %s (k, c, a) VALUES (" + keys[0].key + ", 1, 1)", + "INSERT INTO %s (k, c, a) VALUES (" + keys[0].key + ", 2, 2)", + "INSERT INTO %s (k, c, a) VALUES (" + keys[1].key + ", 1, 2)", + "INSERT INTO %s (k, c, a) VALUES (" + keys[2].key + ", 1, 1)") + .mutate(2, "INSERT INTO %s (k, c, a) VALUES (" + keys[1].key + ", 1, 1)") // make keys[1] match the filter + .mutate(1) // updates the last coordinator for mutation tracking + .queryColumns("k, c, a", 3, 0, + rows(keys[0].row(1, 1), keys[1].row(1, 1), keys[2].row(1, 1)), + rows(keys[0].row(1, 1), keys[1].row(1, 1), keys[2].row(1, 1)), + rows(keys[0].row(1, 1), keys[1].row(1, 1), keys[2].row(1, 1))) + .tearDown(1, + rows(keys[0].row(1, 1), keys[0].row(2, 2), keys[1].row(1, 1), keys[2].row(1, 1)), + rows(keys[0].row(1, 1), keys[1].row(1, 1), keys[2].row(1, 1))); + } + + @Test + public void testNonFilteredInterleavedShortRead() + { + Assume.assumeTrue(replicationType.isTracked()); // flaky for untracked replication + TokenSortedKey[] keys = TokenSortedKey.create(4); + + tester("WHERE a=1 ALLOW FILTERING") + .createTable("CREATE TABLE %s (k int, c int, a int, PRIMARY KEY(k, c))") + .mutate(1, "INSERT INTO %s (k, c, a) VALUES (" + keys[0].key + ", 1, 1)", + "INSERT INTO %s (k, c, a) VALUES (" + keys[0].key + ", 2, 2)", + "INSERT INTO %s (k, c, a) VALUES (" + keys[2].key + ", 1, 1)") + .mutate(2, "INSERT INTO %s (k, c, a) VALUES (" + keys[1].key + ", 1, 1)") // make keys[1] match the filter + .mutate(1) // updates the last coordinator for mutation tracking + .queryColumns("k, c, a", 3, 0, + rows(keys[0].row(1, 1), keys[1].row(1, 1), keys[2].row(1, 1)), + rows(keys[0].row(1, 1), keys[1].row(1, 1), keys[2].row(1, 1)), + rows(keys[0].row(1, 1), keys[1].row(1, 1), keys[2].row(1, 1))) + .tearDown(1, + rows(keys[0].row(1, 1), keys[0].row(2, 2), keys[1].row(1, 1), keys[2].row(1, 1)), + rows(keys[0].row(1, 1), keys[1].row(1, 1), keys[2].row(1, 1))); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairSliceQueriesTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairSliceQueriesTest.java index 58e9ecca11..62f3702d50 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairSliceQueriesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairSliceQueriesTest.java @@ -20,8 +20,6 @@ package org.apache.cassandra.distributed.test; import org.junit.Test; -import org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils; - import static org.apache.cassandra.distributed.shared.AssertUtils.row; /** @@ -64,7 +62,6 @@ public class ReadRepairSliceQueriesTest extends ReadRepairQueryTester @Test public void testSliceQueryWithFilter() { - MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "Depends on ALLOW FILTERING"); tester("WHERE k=0 AND a>10 AND a<40 ALLOW FILTERING") .createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY KEY(k, c))") .mutate("INSERT INTO %s (k, c, a, b) VALUES (0, 1, 10, 100)", diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java index 7e17c9c557..7d859ce110 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java @@ -176,7 +176,7 @@ public class MutationTrackingPendingReadTest // check that the returned data contains the unapplied mutation try (PartialTrackedRead.CompletedRead completedRead = read.complete(); - PartitionIterator partitions = completedRead.iterator()) + PartitionIterator partitions = completedRead.response().makeIterator(command)) { Assert.assertTrue(partitions.hasNext()); try (RowIterator rowIterator = partitions.next()) diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java index 306d6e5092..37f6c6a46f 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java @@ -21,6 +21,8 @@ package org.apache.cassandra.distributed.test.tracking; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.replication.CoordinatorLogId; import org.apache.cassandra.replication.MutationSummary; import org.junit.Assert; @@ -77,6 +79,31 @@ public class MutationTrackingReadReconciliationTest extends TestBaseImpl awaitNodeLiveness(from, InetAddressAndPort.getByAddress(node.broadcastAddress()), true); } + // TODO (expected): remove after speculation is implemented + private static Object[][] queryWithRetries(ICoordinator coordinator, String query) throws InterruptedException + { + int attempt = 0; + for (;;) + { + attempt++; + try + { + return coordinator.execute(query, ConsistencyLevel.QUORUM); + } + catch (Throwable t) + { + if (attempt < 10 && t.getClass().getSimpleName().equals(ReadTimeoutException.class.getSimpleName())) + { + Thread.sleep(2000); + } + else + { + throw t; + } + } + } + } + /** * Test a read reconciliation where the coordinator doesn't have a read response it needs to apply * additional mutations to @@ -146,9 +173,8 @@ public class MutationTrackingReadReconciliationTest extends TestBaseImpl awaitNodeAlive(cluster.get(1), cluster.get(3)); awaitNodeDead(cluster.get(1), cluster.get(2)); - Assert.assertEquals(0, numLogReconciliations(cluster.get(1))); - Object[][] result = cluster.coordinator(1).execute(format("SELECT * FROM %s.%s WHERE k=1", keyspaceName, tableName), ConsistencyLevel.QUORUM); + Object[][] result = queryWithRetries(cluster.coordinator(1), format("SELECT * FROM %s.%s WHERE k=1", keyspaceName, tableName)); Assert.assertEquals(row(row(1, 0, 0), row(1, 1, 1)), result); // check that node3 has the new ids @@ -225,7 +251,7 @@ public class MutationTrackingReadReconciliationTest extends TestBaseImpl Assert.assertEquals(0, numLogReconciliations(cluster.get(1))); - Object[][] result = cluster.coordinator(3).execute(format("SELECT * FROM %s.%s WHERE k=1", keyspaceName, tableName), ConsistencyLevel.QUORUM); + Object[][] result = queryWithRetries(cluster.coordinator(3), format("SELECT * FROM %s.%s WHERE k=1", keyspaceName, tableName)); Assert.assertEquals(row(row(1, 0, 0), row(1, 1, 1)), result); // check that node3 has the new ids @@ -302,7 +328,7 @@ public class MutationTrackingReadReconciliationTest extends TestBaseImpl // No reconciliation has happened yet Assert.assertEquals(0, numLogReconciliations(cluster.get(1))); - Object[][] result = cluster.coordinator(1).execute(format("SELECT * FROM %s.%s", keyspaceName, tableName), ConsistencyLevel.QUORUM); + Object[][] result = queryWithRetries(cluster.coordinator(1), format("SELECT * FROM %s.%s", keyspaceName, tableName)); Assert.assertEquals(row(row(1, 0, 0), row(1, 1, 1), row(2, 2, 2)), result); // Coordinator sends its missing mutations to 3 on read @@ -377,7 +403,7 @@ public class MutationTrackingReadReconciliationTest extends TestBaseImpl Assert.assertEquals(0, numLogReconciliations(cluster.get(3))); - Object[][] result = cluster.coordinator(3).execute(format("SELECT * FROM %s.%s", keyspaceName, tableName), ConsistencyLevel.QUORUM); + Object[][] result = queryWithRetries(cluster.coordinator(3), format("SELECT * FROM %s.%s", keyspaceName, tableName)); Assert.assertEquals(row(row(1, 0, 0), row(1, 1, 1), row(2, 2, 2)), result); // Coordinator sends its missing mutations to 3 on read --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
