Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/030ec1f0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/030ec1f0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/030ec1f0 Branch: refs/heads/trunk Commit: 030ec1f056d0e0b9094ddf7fcd2a491cb8ddf621 Parents: 4809f42 2bae4ca Author: Aleksey Yeschenko <alek...@yeschenko.com> Authored: Wed Sep 20 17:47:32 2017 +0100 Committer: Aleksey Yeschenko <alek...@yeschenko.com> Committed: Wed Sep 20 17:47:32 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 5 + .../apache/cassandra/service/DataResolver.java | 303 +++++++++++-------- 3 files changed, 187 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/030ec1f0/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/030ec1f0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 5aecc9d,548de88..72a63f0 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -2654,10 -2635,15 +2654,15 @@@ public class ColumnFamilyStore implemen if (keyspace == null) return null; - UUID id = Schema.instance.getId(ksName, cfName); - if (id == null) + TableMetadata table = Schema.instance.getTableMetadata(ksName, cfName); + if (table == null) return null; - return keyspace.getColumnFamilyStore(id); + return keyspace.getColumnFamilyStore(table.id); } + - public static TableMetrics metricsFor(UUID tableId) ++ public static TableMetrics metricsFor(TableId tableId) + { + return getIfExists(tableId).metric; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/030ec1f0/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/DataResolver.java index 98e3285,32fc015..b0741da --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@@ -27,10 -27,7 +27,9 @@@ import com.google.common.collect.Iterab import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.*; +import org.apache.cassandra.schema.ColumnMetadata; - import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.filter.DataLimits.Counter; @@@ -88,26 -99,22 +101,19 @@@ public class DataResolver extends Respo * See CASSANDRA-13747 for more details. */ - DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition()); + DataLimits.Counter mergedResultCounter = + command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition()); - UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter); - FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness())); - PartitionIterator counted = counter.applyTo(filtered); + UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter); + FilteredPartitions filtered = + FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness())); + PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter); - - return command.isForThrift() - ? counted - : Transformation.apply(counted, new EmptyPartitionsDiscarder()); + return Transformation.apply(counted, new EmptyPartitionsDiscarder()); } - public void compareResponses() - { - // We need to fully consume the results to trigger read repairs if appropriate - try (PartitionIterator iterator = resolve()) - { - PartitionIterators.consume(iterator); - } - } - private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, - DataLimits.Counter resultCounter) + DataLimits.Counter mergedResultCounter) { // If we have only one results, there is no read repair to do and we can't get short reads if (results.size() == 1) @@@ -209,7 -227,7 +226,7 @@@ // For each source, record if there is an open range to send as repair, and from where. private final ClusteringBound[] markerToRepair = new ClusteringBound[sources.length]; - public MergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed) - private MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed) ++ private MergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed) { this.partitionKey = partitionKey; this.columns = columns; @@@ -503,12 -523,16 +522,16 @@@ private class ShortReadRowProtection extends Transformation implements MoreRows<UnfilteredRowIterator> { - final TableMetadata metadata; - final DecoratedKey partitionKey; - Clustering lastClustering; - int lastCount = 0; - private final CFMetaData metadata; ++ private final TableMetadata metadata; + private final DecoratedKey partitionKey; + + private Clustering lastClustering; + + private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows + private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command) + private int lastQueried = 0; // # extra rows requested from the replica last time - private ShortReadRowProtection(CFMetaData metadata, DecoratedKey partitionKey) + private ShortReadRowProtection(TableMetadata metadata, DecoratedKey partitionKey) { this.metadata = metadata; this.partitionKey = partitionKey; @@@ -521,100 -545,141 +544,141 @@@ return row; } - @Override + /* + * We have a potential short read if the result from a given node contains the requested number of rows + * for that partition (i.e. it has stopped returning results due to the limit), but some of them haven't + * made it into the final post-reconciliation result due to other nodes' tombstones. + * + * If that is the case, then that node may have more rows that we should fetch, as otherwise we could + * ultimately return fewer rows than required. Also, those additional rows may contain tombstones which + * which we also need to fetch as they may shadow rows from other replicas' results, which we would + * otherwise return incorrectly. + * + * Also note that we only get here once all the rows for this partition have been iterated over, and so + * if the node had returned the requested number of rows but we still get here, then some results were + * skipped during reconciliation. + */ public UnfilteredRowIterator moreContents() { - assert !postReconciliationCounter.isDoneForPartition(); - - // We have a short read if the node this is the result of has returned the requested number of - // rows for that partition (i.e. it has stopped returning results due to the limit), but some of - // those results haven't made it in the final result post-reconciliation due to other nodes - // tombstones. If that is the case, then the node might have more results that we should fetch - // as otherwise we might return less results than required, or results that shouldn't be returned - // (because the node has tombstone that hides future results from other nodes but that haven't - // been returned due to the limit). - // Also note that we only get here once all the results for this node have been returned, and so - // if the node had returned the requested number but we still get there, it imply some results were - // skipped during reconciliation. - if (lastCount == counted(counter) || !counter.isDoneForPartition()) + // never try to request additional rows from replicas if our reconciled partition is already filled to the limit + assert !mergedResultCounter.isDoneForPartition(); + + // we do not apply short read protection when we have no limits at all + assert !command.limits().isUnlimited(); + + // if the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more + if (!singleResultCounter.isDoneForPartition()) return null; - // clustering of the last row returned is empty, meaning that there is only one row per partition, - // and we already have it. - if (lastClustering == Clustering.EMPTY) + /* + * If the replica has no live rows in the partition, don't try to fetch more. + * + * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't + * always cover this scenario: + * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit], + * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition. + * + * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch + * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only + * have tombstones in the current partition. + * + * One other way we can hit this condition is when the partition only has a live static row and no regular + * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after + * the moreContents() call. + */ + if (countedInCurrentPartition(singleResultCounter) == 0) return null; - lastCount = counted(counter); - - // We need to try to query enough additional results to fulfill our query, but because we could still - // get short reads on that additional query, just querying the number of results we miss may not be - // enough. But we know that when this node answered n rows (counter.countedInCurrentPartition), only - // x rows (postReconciliationCounter.countedInCurrentPartition()) made it in the final result. - // So our ratio of live rows to requested rows is x/n, so since we miss n-x rows, we estimate that - // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n. - // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a - // counting iterator. - int n = countedInCurrentPartition(postReconciliationCounter); - int x = countedInCurrentPartition(counter); - int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1); - - DataLimits retryLimits = command.limits().forShortReadRetry(toQuery); - ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); - ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata.comparator, lastClustering, false); - SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(), - command.nowInSec(), - command.columnFilter(), - command.rowFilter(), - retryLimits, - partitionKey, - retryFilter); - - Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source); - Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().id).metric.shortReadProtectionRequests.mark(); - - return doShortReadRetry(cmd); - } + /* + * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering. + * We already have the row, so there is no point in asking for more from the partition. + */ + if (Clustering.EMPTY == lastClustering) + return null; - /** - * Returns the number of results counted by the counter. - * - * @param counter the counter. - * @return the number of results counted by the counter - */ - private int counted(Counter counter) - { - // We are interested by the number of rows but for GROUP BY queries 'counted' returns the number of - // groups. - if (command.limits().isGroupByLimit()) - return counter.rowCounted(); + lastFetched = countedInCurrentPartition(singleResultCounter) - lastCounted; + lastCounted = countedInCurrentPartition(singleResultCounter); + + // getting back fewer rows than we asked for means the partition on the replica has been fully consumed + if (lastQueried > 0 && lastFetched < lastQueried) + return null; - return counter.counted(); + /* + * At this point we know that: + * 1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more + * rows in the partition + * 2. at least one of those returned rows was shadowed by a tombstone returned from another + * replica + * 3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to + * avoid a short read + * + * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b + * are defined as follows: + * [a] limits.count() - mergedResultCounter.counted() + * [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition() + * + * It would be naive to query for exactly that many rows, as it's possible and not unlikely + * that some of the returned rows would also be shadowed by tombstones from other hosts. + * + * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result; + * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it. + * + * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number + * of rows fetched: there is a high transactional cost for every individual request, but a relatively low + * marginal cost for each extra row requested. + * + * As such it's better to overfetch than to underfetch extra rows from a host; but at the same + * time we want to respect paging limits and not blow up spectacularly. + * + * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only + * counts. + * + * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits, + * but no fewer than 8 rows (an arbitrary round lower bound), to ensure that we won't fetch row by row + * for SELECT DISTINCT queries (that set per partition limit to 1). + * + * See CASSANDRA-13794 for more details. + */ + lastQueried = Math.max(Math.min(command.limits().count(), command.limits().perPartitionCount()), 8); + - ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark(); ++ ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark(); + Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source); + + return executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried)); } - /** - * Returns the number of results counted in the partition by the counter. - * - * @param counter the counter. - * @return the number of results counted in the partition by the counter - */ + // Counts the number of rows for regular queries and the number of groups for GROUP BY queries private int countedInCurrentPartition(Counter counter) { - // We are interested by the number of rows but for GROUP BY queries 'countedInCurrentPartition' returns - // the number of groups in the current partition. - if (command.limits().isGroupByLimit()) - return counter.rowCountedInCurrentPartition(); + return command.limits().isGroupByLimit() + ? counter.rowCountedInCurrentPartition() + : counter.countedInCurrentPartition(); + } - return counter.countedInCurrentPartition(); + private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery) + { + ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); + if (null != lastClustering) + filter = filter.forPaging(metadata.comparator, lastClustering, false); + + return SinglePartitionReadCommand.create(command.metadata(), + command.nowInSec(), + command.columnFilter(), + command.rowFilter(), + command.limits().forShortReadRetry(toQuery), + partitionKey, + filter); } - private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand) + private UnfilteredRowIterator executeReadCommand(SinglePartitionReadCommand cmd) { - DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime); - ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), queryStartNanoTime); + DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1, queryStartNanoTime); + ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source), queryStartNanoTime); + if (StorageProxy.canDoLocalRequest(source)) - StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); else - MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), source, handler); - MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), source, handler); ++ MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source, handler); // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. handler.awaitResults(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org