This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 7cddbd40ce6b326df533fd6d3c4131ef70b3b068 Merge: 2121767 f258ae6 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Mon Feb 15 12:50:38 2021 +0100 Merge branch 'cassandra-3.11' into trunk .../org/apache/cassandra/db/filter/DataLimits.java | 28 ++++++++++---------- .../reads/ShortReadPartitionsProtection.java | 2 +- .../service/reads/ShortReadRowsProtection.java | 2 +- .../cassandra/distributed/test/GroupByTest.java | 30 ++++++++++++++++++++-- 4 files changed, 44 insertions(+), 18 deletions(-) diff --cc src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java index 6c4dc68,0000000..51043c3 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java @@@ -1,199 -1,0 +1,199 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads; + +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.transform.MorePartitions; +import org.apache.cassandra.db.transform.MoreRows; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.ExcludingBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.reads.repair.NoopReadRepair; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.tracing.Tracing; + +public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator> +{ + private static final Logger logger = LoggerFactory.getLogger(ShortReadPartitionsProtection.class); + private final ReadCommand command; + private final Replica source; + + private final Runnable preFetchCallback; // called immediately before fetching more contents + + private final DataLimits.Counter singleResultCounter; // unmerged per-source counter + private final DataLimits.Counter mergedResultCounter; // merged end-result counter + + private DecoratedKey lastPartitionKey; // key of the last observed partition + + private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call + + private final long queryStartNanoTime; + + public ShortReadPartitionsProtection(ReadCommand command, + Replica source, + Runnable preFetchCallback, + DataLimits.Counter singleResultCounter, + DataLimits.Counter mergedResultCounter, + long queryStartNanoTime) + { + this.command = command; + this.source = source; + this.preFetchCallback = preFetchCallback; + this.singleResultCounter = singleResultCounter; + this.mergedResultCounter = mergedResultCounter; + this.queryStartNanoTime = queryStartNanoTime; + } + + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + partitionsFetched = true; + + lastPartitionKey = partition.partitionKey(); + + /* + * Extend for moreContents() then apply protection to track lastClustering by applyToRow(). + * + * If we don't apply the transformation *after* extending the partition with MoreRows, + * applyToRow() method of protection will not be called on the first row of the new extension iterator. + */ + ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source); + ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan); + ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(), + command, source, + (cmd) -> executeReadCommand(cmd, sharedReplicaPlan), + singleResultCounter, + mergedResultCounter); + return Transformation.apply(MoreRows.extend(partition, protection), protection); + } + + /* + * We only get here once all the rows and partitions in this iterator have been iterated over, and so + * if the node had returned the requested number of rows but we still get here, then some results were + * skipped during reconciliation. + */ + public UnfilteredPartitionIterator moreContents() + { + // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit + assert !mergedResultCounter.isDone(); + + // we do not apply short read protection when we have no limits at all + assert !command.limits().isUnlimited(); + + /* + * If this is a single partition read command or an (indexed) partition range read command with + * a partition key specified, then we can't and shouldn't try fetch more partitions. + */ + assert !command.isLimitedToOnePartition(); + + /* + * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more. + * + * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false + * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911). + */ - if (!singleResultCounter.isDone() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) ++ if (command.limits().isExhausted(singleResultCounter) && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) + return null; + + /* + * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator. + * There is no point to ask the replica for more rows - it has no more in the requested range. + */ + if (!partitionsFetched) + return null; + partitionsFetched = false; + + /* + * We are going to fetch one partition at a time for thrift and potentially more for CQL. + * The row limit will either be set to the per partition limit - if the command has no total row limit set, or + * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions, + * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones. + */ + int toQuery = command.limits().count() != DataLimits.NO_LIMIT + ? command.limits().count() - mergedResultCounter.rowsCounted() + : command.limits().perPartitionCount(); + + ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark(); + Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source); + logger.info("Requesting {} extra rows from {} for short read protection", toQuery, source); + + // If we've arrived here, all responses have been consumed, and we're about to request more. + preFetchCallback.run(); + + return makeAndExecuteFetchAdditionalPartitionReadCommand(toQuery); + } + + private UnfilteredPartitionIterator makeAndExecuteFetchAdditionalPartitionReadCommand(int toQuery) + { + PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command; + + DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery); + + AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange(); + AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight() + ? new Range<>(lastPartitionKey, bounds.right) + : new ExcludingBounds<>(lastPartitionKey, bounds.right); + DataRange newDataRange = cmd.dataRange().forSubRange(newBounds); + + ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source, 1); + return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), ReplicaPlan.shared(replicaPlan)); + } + + private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shared<E, P> replicaPlan) + { + DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime); + ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime); + + if (source.isSelf()) + { + Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); + } + else + { + if (source.isTransient()) + cmd = cmd.copyAsTransientQuery(source); + MessagingService.instance().sendWithCallback(cmd.createMessage(false), source.endpoint(), handler); + } + + // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. + handler.awaitResults(); + assert resolver.getMessages().size() == 1; + return resolver.getMessages().get(0).payload.makeIterator(command); + } +} diff --cc src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java index 9ba074d,0000000..8061f0a mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java @@@ -1,189 -1,0 +1,189 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads; + +import java.util.function.Function; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.transform.MoreRows; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.tracing.Tracing; + +class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator> +{ + private final ReadCommand command; + private final Replica source; + private final DataLimits.Counter singleResultCounter; // unmerged per-source counter + private final DataLimits.Counter mergedResultCounter; // merged end-result counter + private final Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor; + private final TableMetadata metadata; + private final DecoratedKey partitionKey; + + private Clustering<?> lastClustering; // clustering of the last observed row + + private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows + private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command) + private int lastQueried = 0; // # extra rows requested from the replica last time + + ShortReadRowsProtection(DecoratedKey partitionKey, ReadCommand command, Replica source, + Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor, + DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter) + { + this.command = command; + this.source = source; + this.commandExecutor = commandExecutor; + this.singleResultCounter = singleResultCounter; + this.mergedResultCounter = mergedResultCounter; + this.metadata = command.metadata(); + this.partitionKey = partitionKey; + } + + @Override + public Row applyToRow(Row row) + { + lastClustering = row.clustering(); + return row; + } + + /* + * We only get here once all the rows in this iterator have been iterated over, and so if the node + * had returned the requested number of rows but we still get here, then some results were skipped + * during reconciliation. + */ + public UnfilteredRowIterator moreContents() + { + // never try to request additional rows from replicas if our reconciled partition is already filled to the limit + assert !mergedResultCounter.isDoneForPartition(); + + // we do not apply short read protection when we have no limits at all + assert !command.limits().isUnlimited(); + + /* + * If the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more. + * + * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false + * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911). + */ - if (!singleResultCounter.isDoneForPartition() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) ++ if (command.limits().isExhausted(singleResultCounter) && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) + return null; + + /* + * If the replica has no live rows in the partition, don't try to fetch more. + * + * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't + * always cover this scenario: + * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit], + * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition. + * + * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch + * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only + * have tombstones in the current partition. + * + * One other way we can hit this condition is when the partition only has a live static row and no regular + * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after + * the moreContents() call. + */ + if (singleResultCounter.rowsCountedInCurrentPartition() == 0) + return null; + + /* + * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering. + * We already have the row, so there is no point in asking for more from the partition. + */ + if (lastClustering != null && lastClustering.isEmpty()) + return null; + + lastFetched = singleResultCounter.rowsCountedInCurrentPartition() - lastCounted; + lastCounted = singleResultCounter.rowsCountedInCurrentPartition(); + + // getting back fewer rows than we asked for means the partition on the replica has been fully consumed + if (lastQueried > 0 && lastFetched < lastQueried) + return null; + + /* + * At this point we know that: + * 1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more + * rows in the partition + * 2. at least one of those returned rows was shadowed by a tombstone returned from another + * replica + * 3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to + * avoid a short read + * + * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b + * are defined as follows: + * [a] limits.count() - mergedResultCounter.counted() + * [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition() + * + * It would be naive to query for exactly that many rows, as it's possible and not unlikely + * that some of the returned rows would also be shadowed by tombstones from other hosts. + * + * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result; + * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it. + * + * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number + * of rows fetched: there is a high transactional cost for every individual request, but a relatively low + * marginal cost for each extra row requested. + * + * As such it's better to overfetch than to underfetch extra rows from a host; but at the same + * time we want to respect paging limits and not blow up spectacularly. + * + * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only + * counts. + * + * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits. + * + * See CASSANDRA-13794 for more details. + */ + lastQueried = Math.min(command.limits().count(), command.limits().perPartitionCount()); + + ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark(); + Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source); + + SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried); + return UnfilteredPartitionIterators.getOnlyElement(commandExecutor.apply(cmd), cmd); + } + + private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery) + { + ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); + if (null != lastClustering) + filter = filter.forPaging(metadata.comparator, lastClustering, false); + + return SinglePartitionReadCommand.create(command.metadata(), + command.nowInSec(), + command.columnFilter(), + command.rowFilter(), + command.limits().forShortReadRetry(toQuery), + partitionKey, + filter, + command.indexMetadata()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org