Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 95839aae2 -> 68a67469f
Implement short read protection on partition boundaries patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for CASSANDRA-13595 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/68a67469 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/68a67469 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/68a67469 Branch: refs/heads/cassandra-3.0 Commit: 68a67469f8b25534d086b29b8fe0fa4ec3f9d1ec Parents: 1efdf33 Author: Aleksey Yeschenko <alek...@yeschenko.com> Authored: Thu Sep 21 14:29:05 2017 +0100 Committer: Aleksey Yeschenko <alek...@yeschenko.com> Committed: Sat Sep 30 10:33:30 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/PartitionRangeReadCommand.java | 14 ++ .../org/apache/cassandra/db/ReadCommand.java | 2 + .../db/SinglePartitionReadCommand.java | 5 + .../apache/cassandra/db/filter/DataLimits.java | 5 +- .../apache/cassandra/service/DataResolver.java | 216 ++++++++++++++----- 6 files changed, 180 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1c53aa5..c5e54d4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Implement short read protection on partition boundaries (CASSANDRA-13595) * Fix ISE thrown by UPI.Serializer.hasNext() for some SELECT queries (CASSANDRA-13911) * Filter header only commit logs before recovery (CASSANDRA-13918) * AssertionError prepending to a list (CASSANDRA-13149) http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 9e557e0..84e3c7d 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -32,6 +32,7 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.BaseRowIterator; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -400,6 +401,19 @@ public class PartitionRangeReadCommand extends ReadCommand return DataRange.serializer.serializedSize(dataRange(), version, metadata()); } + /* + * We are currently using PartitionRangeReadCommand for most index queries, even if they are explicitly restricted + * to a single partition key. Return true if that is the case. + * + * See CASSANDRA-11617 and CASSANDRA-11872 for details. + */ + public boolean isLimitedToOnePartition() + { + return dataRange.keyRange instanceof Bounds + && dataRange.startKey().kind() == PartitionPosition.Kind.ROW_KEY + && dataRange.startKey().equals(dataRange.stopKey()); + } + private static class Deserializer extends SelectionDeserializer { public ReadCommand deserialize(DataInputPlus in, http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 160b104..2d399d8 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -170,6 +170,8 @@ public abstract class ReadCommand implements ReadQuery protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException; protected abstract long selectionSerializedSize(int version); + public abstract boolean isLimitedToOnePartition(); + /** * The metadata for the table queried. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 7a66eca..4b10530 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -1096,6 +1096,11 @@ public class SinglePartitionReadCommand extends ReadCommand + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version); } + public boolean isLimitedToOnePartition() + { + return true; + } + /** * Groups multiple single partition read commands. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java index 6b74293..4c57a76 100644 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@ -312,10 +312,7 @@ public abstract class DataLimits public DataLimits forShortReadRetry(int toFetch) { - // When we do a short read retry, we're only ever querying the single partition on which we have a short read. So - // we use toFetch as the row limit and use no perPartitionLimit (it would be equivalent in practice to use toFetch - // for both argument or just for perPartitionLimit with no limit on rowLimit). - return new CQLLimits(toFetch, NO_LIMIT, isDistinct); + return new CQLLimits(toFetch, perPartitionLimit, isDistinct); } public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 7d8ffc5..5fb34c6 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -34,6 +34,9 @@ import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.transform.*; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.ExcludingBounds; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.net.*; import org.apache.cassandra.tracing.Tracing; @@ -91,7 +94,7 @@ public class DataResolver extends ResponseResolver * have more rows than the client requested. To make sure that we still conform to the original limit, * we apply a top-level post-reconciliation counter to the merged partition iterator. * - * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied + * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied * to the current partition to work. For this reason we have to apply the counter transformation before * empty partition discard logic kicks in - for it will eagerly consume the iterator. * @@ -121,26 +124,13 @@ public class DataResolver extends ResponseResolver if (results.size() == 1) return results.get(0); - // So-called "short reads" stems from nodes returning only a subset of the results they have for a partition due to the limit, - // but that subset not being enough post-reconciliation. So if we don't have limit, don't bother. + /* + * So-called short reads stems from nodes returning only a subset of the results they have due to the limit, + * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother. + */ if (!command.limits().isUnlimited()) - { for (int i = 0; i < results.size(); i++) - { - DataLimits.Counter singleResultCounter = - command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(), enforceStrictLiveness).onlyCount(); - - ShortReadResponseProtection protection = - new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter); - - /* - * The order of transformations is important here. See ShortReadResponseProtection.applyToPartition() - * comments for details. We want singleResultCounter.applyToPartition() to be called after SRRP applies - * its transformations, so that this order is preserved when calling applyToRows() too. - */ - results.set(i, Transformation.apply(Transformation.apply(results.get(i), protection), singleResultCounter)); - } - } + results.set(i, extendWithShortReadProtection(results.get(i), sources[i], mergedResultCounter)); return UnfilteredPartitionIterators.merge(results, command.nowInSec(), new RepairMergeListener(sources)); } @@ -476,14 +466,60 @@ public class DataResolver extends ResponseResolver } } - private class ShortReadResponseProtection extends Transformation<UnfilteredRowIterator> + private UnfilteredPartitionIterator extendWithShortReadProtection(UnfilteredPartitionIterator partitions, + InetAddress source, + DataLimits.Counter mergedResultCounter) + { + DataLimits.Counter singleResultCounter = + command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(), enforceStrictLiveness).onlyCount(); + + ShortReadPartitionsProtection protection = + new ShortReadPartitionsProtection(source, singleResultCounter, mergedResultCounter); + + /* + * The order of extention and transformations is important here. Extending with more partitions has to happen + * first due to the way BaseIterator.hasMoreContents() works: only transformations applied after extension will + * be called on the first partition of the extended iterator. + * + * Additionally, we want singleResultCounter to be applied after SRPP, so that its applyToPartition() method will + * be called last, after the extension done by SRRP.applyToPartition() call. That way we preserve the same order + * when it comes to calling SRRP.moreContents() and applyToRow() callbacks. + * + * See ShortReadPartitionsProtection.applyToPartition() for more details. + */ + + // extend with moreContents() only if it's a range read command with no partition key specified + if (!command.isLimitedToOnePartition()) + partitions = MorePartitions.extend(partitions, protection); // register SRPP.moreContents() + + partitions = Transformation.apply(partitions, protection); // register SRPP.applyToPartition() + partitions = Transformation.apply(partitions, singleResultCounter); // register the per-source counter + + return partitions; + } + + /* + * We have a potential short read if the result from a given node contains the requested number of rows + * (i.e. it has stopped returning results due to the limit), but some of them haven't + * made it into the final post-reconciliation result due to other nodes' row, range, and/or partition tombstones. + * + * If that is the case, then that node may have more rows that we should fetch, as otherwise we could + * ultimately return fewer rows than required. Also, those additional rows may contain tombstones which + * which we also need to fetch as they may shadow rows or partitions from other replicas' results, which we would + * otherwise return incorrectly. + */ + private class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator> { private final InetAddress source; private final DataLimits.Counter singleResultCounter; // unmerged per-source counter private final DataLimits.Counter mergedResultCounter; // merged end-result counter - private ShortReadResponseProtection(InetAddress source, DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter) + private DecoratedKey lastPartitionKey; // key of the last observed partition + + private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call + + private ShortReadPartitionsProtection(InetAddress source, DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter) { this.source = source; this.singleResultCounter = singleResultCounter; @@ -493,29 +529,100 @@ public class DataResolver extends ResponseResolver @Override public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) { - ShortReadRowProtection protection = new ShortReadRowProtection(partition.metadata(), partition.partitionKey()); + partitionsFetched = true; + + lastPartitionKey = partition.partitionKey(); /* - * Extend for moreContents() then apply protection to track lastClustering. + * Extend for moreContents() then apply protection to track lastClustering by applyToRow(). * * If we don't apply the transformation *after* extending the partition with MoreRows, * applyToRow() method of protection will not be called on the first row of the new extension iterator. */ + ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.metadata(), partition.partitionKey()); return Transformation.apply(MoreRows.extend(partition, protection), protection); } - private class ShortReadRowProtection extends Transformation implements MoreRows<UnfilteredRowIterator> + /* + * We only get here once all the rows and partitions in this iterator have been iterated over, and so + * if the node had returned the requested number of rows but we still get here, then some results were + * skipped during reconciliation. + */ + public UnfilteredPartitionIterator moreContents() + { + // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit + assert !mergedResultCounter.isDone(); + + // we do not apply short read protection when we have no limits at all + assert !command.limits().isUnlimited(); + + /* + * If this is a single partition read command or an (indexed) partition range read command with + * a partition key specified, then we can't and shouldn't try fetch more partitions. + */ + assert !command.isLimitedToOnePartition(); + + /* + * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more. + * + * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false + * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911). + */ + if (!singleResultCounter.isDone() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) + return null; + + /* + * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator. + * There is no point to ask the replica for more rows - it has no more in the requested range. + */ + if (!partitionsFetched) + return null; + partitionsFetched = false; + + /* + * We are going to fetch one partition at a time for thrift and potentially more for CQL. + * The row limit will either be set to the per partition limit - if the command has no total row limit set, or + * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions, + * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones. + */ + int toQuery = command.limits().count() != DataLimits.NO_LIMIT + ? command.limits().count() - mergedResultCounter.counted() + : command.limits().perPartitionCount(); + + ColumnFamilyStore.metricsFor(command.metadata().cfId).shortReadProtectionRequests.mark(); + Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source); + + PartitionRangeReadCommand cmd = makeFetchAdditionalPartitionReadCommand(toQuery); + return executeReadCommand(cmd); + } + + private PartitionRangeReadCommand makeFetchAdditionalPartitionReadCommand(int toQuery) + { + PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command; + + DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery); + + AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange(); + AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight() + ? new Range<>(lastPartitionKey, bounds.right) + : new ExcludingBounds<>(lastPartitionKey, bounds.right); + DataRange newDataRange = cmd.dataRange().forSubRange(newBounds); + + return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange); + } + + private class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator> { private final CFMetaData metadata; private final DecoratedKey partitionKey; - private Clustering lastClustering; + private Clustering lastClustering; // clustering of the last observed row private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command) private int lastQueried = 0; // # extra rows requested from the replica last time - private ShortReadRowProtection(CFMetaData metadata, DecoratedKey partitionKey) + private ShortReadRowsProtection(CFMetaData metadata, DecoratedKey partitionKey) { this.metadata = metadata; this.partitionKey = partitionKey; @@ -529,18 +636,9 @@ public class DataResolver extends ResponseResolver } /* - * We have a potential short read if the result from a given node contains the requested number of rows - * for that partition (i.e. it has stopped returning results due to the limit), but some of them haven't - * made it into the final post-reconciliation result due to other nodes' tombstones. - * - * If that is the case, then that node may have more rows that we should fetch, as otherwise we could - * ultimately return fewer rows than required. Also, those additional rows may contain tombstones which - * which we also need to fetch as they may shadow rows from other replicas' results, which we would - * otherwise return incorrectly. - * - * Also note that we only get here once all the rows for this partition have been iterated over, and so - * if the node had returned the requested number of rows but we still get here, then some results were - * skipped during reconciliation. + * We only get here once all the rows in this iterator have been iterated over, and so if the node + * had returned the requested number of rows but we still get here, then some results were skipped + * during reconciliation. */ public UnfilteredRowIterator moreContents() { @@ -622,18 +720,17 @@ public class DataResolver extends ResponseResolver * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only * counts. * - * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits, - * but no fewer than 8 rows (an arbitrary round lower bound), to ensure that we won't fetch row by row - * for SELECT DISTINCT queries (that set per partition limit to 1). + * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits. * * See CASSANDRA-13794 for more details. */ - lastQueried = Math.max(Math.min(command.limits().count(), command.limits().perPartitionCount()), 8); + lastQueried = Math.min(command.limits().count(), command.limits().perPartitionCount()); ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark(); Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source); - return executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried)); + SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried); + return UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd), cmd); } private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery) @@ -649,24 +746,25 @@ public class DataResolver extends ResponseResolver command.rowFilter(), command.limits().forShortReadRetry(toQuery), partitionKey, - filter); + filter, + command.indexMetadata()); } + } - private UnfilteredRowIterator executeReadCommand(SinglePartitionReadCommand cmd) - { - DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1); - ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source)); - - if (StorageProxy.canDoLocalRequest(source)) - StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); - else - MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), source, handler); - - // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. - handler.awaitResults(); - assert resolver.responses.size() == 1; - return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), cmd); - } + private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd) + { + DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1); + ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source)); + + if (StorageProxy.canDoLocalRequest(source)) + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); + else + MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), source, handler); + + // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. + handler.awaitResults(); + assert resolver.responses.size() == 1; + return resolver.responses.get(0).payload.makeIterator(command); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org