Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2bae4ca9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2bae4ca9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2bae4ca9 Branch: refs/heads/trunk Commit: 2bae4ca907ac4d2ab53c899e5cf5c9e4de631f52 Parents: c1efaf3 f93e6e3 Author: Aleksey Yeschenko <alek...@yeschenko.com> Authored: Wed Sep 20 17:41:07 2017 +0100 Committer: Aleksey Yeschenko <alek...@yeschenko.com> Committed: Wed Sep 20 17:41:07 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 5 + .../apache/cassandra/service/DataResolver.java | 304 +++++++++++-------- 3 files changed, 187 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bae4ca9/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 39270e5,07742ef..8d07cbc --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,15 -1,5 +1,16 @@@ -3.0.15 +3.11.1 + * AbstractTokenTreeBuilder#serializedSize returns wrong value when there is a single leaf and overflow collisions (CASSANDRA-13869) + * Add a compaction option to TWCS to ignore sstables overlapping checks (CASSANDRA-13418) + * BTree.Builder memory leak (CASSANDRA-13754) + * Revert CASSANDRA-10368 of supporting non-pk column filtering due to correctness (CASSANDRA-13798) + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938) + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744) + * "ignore" option is ignored in sstableloader (CASSANDRA-13721) + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652) + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512) + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641) +Merged from 3.0: + * Improve short read protection performance (CASSANDRA-13794) * Fix sstable reader to support range-tombstone-marker for multi-slices (CASSANDRA-13787) * Fix short read protection for tables with no clustering columns (CASSANDRA-13880) * Make isBuilt volatile in PartitionUpdate (CASSANDRA-13619) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bae4ca9/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bae4ca9/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/DataResolver.java index 4b0bd3c,9a98ee5..32fc015 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@@ -43,12 -43,10 +43,12 @@@ public class DataResolver extends Respo { @VisibleForTesting final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>()); + private final long queryStartNanoTime; - public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime) - DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) ++ DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime) { super(keyspace, command, consistency, maxResponseCount); + this.queryStartNanoTime = queryStartNanoTime; } public PartitionIterator getData() @@@ -122,10 -123,23 +125,23 @@@ if (!command.limits().isUnlimited()) { for (int i = 0; i < results.size(); i++) - results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter, queryStartNanoTime))); + { + DataLimits.Counter singleResultCounter = + command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount(); + + ShortReadResponseProtection protection = - new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter); ++ new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter, queryStartNanoTime); + + /* + * The order of transformations is important here. See ShortReadResponseProtection.applyToPartition() + * comments for details. We want singleResultCounter.applyToPartition() to be called after SRRP applies + * its transformations, so that this order is preserved when calling applyToRows() too. + */ + results.set(i, Transformation.apply(Transformation.apply(results.get(i), protection), singleResultCounter)); + } } - return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener); + return UnfilteredPartitionIterators.merge(results, command.nowInSec(), new RepairMergeListener(sources)); } private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener @@@ -209,9 -223,9 +225,9 @@@ // For each source, the time of the current deletion as known by the source. private final DeletionTime[] sourceDeletionTime = new DeletionTime[sources.length]; // For each source, record if there is an open range to send as repair, and from where. - private final Slice.Bound[] markerToRepair = new Slice.Bound[sources.length]; + private final ClusteringBound[] markerToRepair = new ClusteringBound[sources.length]; - public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed) + private MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed) { this.partitionKey = partitionKey; this.columns = columns; @@@ -471,19 -473,18 +487,24 @@@ } } - private class ShortReadProtection extends Transformation<UnfilteredRowIterator> + private class ShortReadResponseProtection extends Transformation<UnfilteredRowIterator> { private final InetAddress source; - private final DataLimits.Counter counter; - private final DataLimits.Counter postReconciliationCounter; + + private final DataLimits.Counter singleResultCounter; // unmerged per-source counter + private final DataLimits.Counter mergedResultCounter; // merged end-result counter + - private ShortReadResponseProtection(InetAddress source, DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter) + private final long queryStartNanoTime; + - private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter, long queryStartNanoTime) ++ private ShortReadResponseProtection(InetAddress source, ++ DataLimits.Counter singleResultCounter, ++ DataLimits.Counter mergedResultCounter, ++ long queryStartNanoTime) { this.source = source; - this.counter = command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount(); - this.postReconciliationCounter = postReconciliationCounter; + this.singleResultCounter = singleResultCounter; + this.mergedResultCounter = mergedResultCounter; + this.queryStartNanoTime = queryStartNanoTime; } @Override @@@ -523,100 -525,133 +545,141 @@@ return row; } - @Override + /* + * We have a potential short read if the result from a given node contains the requested number of rows + * for that partition (i.e. it has stopped returning results due to the limit), but some of them haven't + * made it into the final post-reconciliation result due to other nodes' tombstones. + * + * If that is the case, then that node may have more rows that we should fetch, as otherwise we could + * ultimately return fewer rows than required. Also, those additional rows may contain tombstones which + * which we also need to fetch as they may shadow rows from other replicas' results, which we would + * otherwise return incorrectly. + * + * Also note that we only get here once all the rows for this partition have been iterated over, and so + * if the node had returned the requested number of rows but we still get here, then some results were + * skipped during reconciliation. + */ public UnfilteredRowIterator moreContents() { - assert !postReconciliationCounter.isDoneForPartition(); - - // We have a short read if the node this is the result of has returned the requested number of - // rows for that partition (i.e. it has stopped returning results due to the limit), but some of - // those results haven't made it in the final result post-reconciliation due to other nodes - // tombstones. If that is the case, then the node might have more results that we should fetch - // as otherwise we might return less results than required, or results that shouldn't be returned - // (because the node has tombstone that hides future results from other nodes but that haven't - // been returned due to the limit). - // Also note that we only get here once all the results for this node have been returned, and so - // if the node had returned the requested number but we still get there, it imply some results were - // skipped during reconciliation. - if (lastCount == counted(counter) || !counter.isDoneForPartition()) + // never try to request additional rows from replicas if our reconciled partition is already filled to the limit + assert !mergedResultCounter.isDoneForPartition(); + + // we do not apply short read protection when we have no limits at all + assert !command.limits().isUnlimited(); + + // if the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more + if (!singleResultCounter.isDoneForPartition()) return null; - // clustering of the last row returned is empty, meaning that there is only one row per partition, - // and we already have it. - if (lastClustering == Clustering.EMPTY) + /* + * If the replica has no live rows in the partition, don't try to fetch more. + * + * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't + * always cover this scenario: + * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit], + * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition. + * + * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch + * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only + * have tombstones in the current partition. + * + * One other way we can hit this condition is when the partition only has a live static row and no regular + * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after + * the moreContents() call. + */ - if (singleResultCounter.countedInCurrentPartition() == 0) ++ if (countedInCurrentPartition(singleResultCounter) == 0) return null; - lastCount = counted(counter); - - // We need to try to query enough additional results to fulfill our query, but because we could still - // get short reads on that additional query, just querying the number of results we miss may not be - // enough. But we know that when this node answered n rows (counter.countedInCurrentPartition), only - // x rows (postReconciliationCounter.countedInCurrentPartition()) made it in the final result. - // So our ratio of live rows to requested rows is x/n, so since we miss n-x rows, we estimate that - // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n. - // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a - // counting iterator. - int n = countedInCurrentPartition(postReconciliationCounter); - int x = countedInCurrentPartition(counter); - int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1); - - DataLimits retryLimits = command.limits().forShortReadRetry(toQuery); - ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); - ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata.comparator, lastClustering, false); - SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(), - command.nowInSec(), - command.columnFilter(), - command.rowFilter(), - retryLimits, - partitionKey, - retryFilter); - - Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source); - Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark(); - - return doShortReadRetry(cmd); - } + /* + * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering. + * We already have the row, so there is no point in asking for more from the partition. + */ + if (Clustering.EMPTY == lastClustering) + return null; - /** - * Returns the number of results counted by the counter. - * - * @param counter the counter. - * @return the number of results counted by the counter - */ - private int counted(Counter counter) - { - // We are interested by the number of rows but for GROUP BY queries 'counted' returns the number of - // groups. - if (command.limits().isGroupByLimit()) - return counter.rowCounted(); - lastFetched = singleResultCounter.countedInCurrentPartition() - lastCounted; - lastCounted = singleResultCounter.countedInCurrentPartition(); ++ lastFetched = countedInCurrentPartition(singleResultCounter) - lastCounted; ++ lastCounted = countedInCurrentPartition(singleResultCounter); + + // getting back fewer rows than we asked for means the partition on the replica has been fully consumed + if (lastQueried > 0 && lastFetched < lastQueried) + return null; - return counter.counted(); + /* + * At this point we know that: + * 1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more + * rows in the partition + * 2. at least one of those returned rows was shadowed by a tombstone returned from another + * replica + * 3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to + * avoid a short read + * + * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b + * are defined as follows: + * [a] limits.count() - mergedResultCounter.counted() + * [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition() + * + * It would be naive to query for exactly that many rows, as it's possible and not unlikely + * that some of the returned rows would also be shadowed by tombstones from other hosts. + * + * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result; + * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it. + * + * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number + * of rows fetched: there is a high transactional cost for every individual request, but a relatively low + * marginal cost for each extra row requested. + * + * As such it's better to overfetch than to underfetch extra rows from a host; but at the same + * time we want to respect paging limits and not blow up spectacularly. + * + * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only + * counts. + * + * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits, + * but no fewer than 8 rows (an arbitrary round lower bound), to ensure that we won't fetch row by row + * for SELECT DISTINCT queries (that set per partition limit to 1). + * + * See CASSANDRA-13794 for more details. + */ + lastQueried = Math.max(Math.min(command.limits().count(), command.limits().perPartitionCount()), 8); + + ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark(); + Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source); + + return executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried)); } - /** - * Returns the number of results counted in the partition by the counter. - * - * @param counter the counter. - * @return the number of results counted in the partition by the counter - */ ++ // Counts the number of rows for regular queries and the number of groups for GROUP BY queries + private int countedInCurrentPartition(Counter counter) + { - // We are interested by the number of rows but for GROUP BY queries 'countedInCurrentPartition' returns - // the number of groups in the current partition. - if (command.limits().isGroupByLimit()) - return counter.rowCountedInCurrentPartition(); ++ return command.limits().isGroupByLimit() ++ ? counter.rowCountedInCurrentPartition() ++ : counter.countedInCurrentPartition(); ++ } + - return counter.countedInCurrentPartition(); + private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery) + { + ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); + if (null != lastClustering) + filter = filter.forPaging(metadata.comparator, lastClustering, false); + + return SinglePartitionReadCommand.create(command.metadata(), + command.nowInSec(), + command.columnFilter(), + command.rowFilter(), + command.limits().forShortReadRetry(toQuery), + partitionKey, + filter); } - private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand) + private UnfilteredRowIterator executeReadCommand(SinglePartitionReadCommand cmd) { - DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime); - ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), queryStartNanoTime); - DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1); - ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source)); ++ DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1, queryStartNanoTime); ++ ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source), queryStartNanoTime); + if (StorageProxy.canDoLocalRequest(source)) - StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); else - MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler); + MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), source, handler); // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. handler.awaitResults(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org