Merge branch cassandra-2.2 into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c85749e2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c85749e2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c85749e2 Branch: refs/heads/cassandra-3.8 Commit: c85749e2bc5c65a03b00994565cd0b6b1a642e65 Parents: 2a828af e86d531 Author: Benjamin Lerer <b.le...@gmail.com> Authored: Fri Aug 12 15:34:01 2016 +0200 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Fri Aug 12 15:37:13 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/service/StorageProxy.java | 49 +++++++++++++------- 2 files changed, 34 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c85749e2/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 3ce9f9e,ddc6720..959b967 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,39 -1,5 +1,40 @@@ -2.2.8 +3.0.9 + * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331) + * Backport CASSANDRA-12002 (CASSANDRA-12177) + * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100) + * Fix potential bad messaging service message for paged range reads + within mixed-version 3.x clusters (CASSANDRA-12249) + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828) + * NullPointerException during compaction on table with static columns (CASSANDRA-12336) + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823) + * Fix upgrade of super columns on thrift (CASSANDRA-12335) + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359) + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277) + * Exception when computing read-repair for range tombstones (CASSANDRA-12263) + * Lost counter writes in compact table and static columns (CASSANDRA-12219) + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247) + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980) + * Add option to override compaction space check (CASSANDRA-12180) + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114) + * Respond with v1/v2 protocol header when responding to driver that attempts + to connect with too low of a protocol version (CASSANDRA-11464) + * NullPointerExpception when reading/compacting table (CASSANDRA-11988) + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144) + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107) + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393) + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147) + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315) + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733) + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098) + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996) + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944) + * Fix column ordering of results with static columns for Thrift requests in + a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of + those static columns in query results (CASSANDRA-12123) + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090) + * Fix EOF exception when altering column type (CASSANDRA-11820) +Merged from 2.2: + * Update StorageProxy range metrics for timeouts, failures and unavailables (CASSANDRA-9507) * Add Sigar to classes included in clientutil.jar (CASSANDRA-11635) * Add decay to histograms and timers used for metrics (CASSANDRA-11752) * Fix hanging stream session (CASSANDRA-10992) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c85749e2/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 483da67,7b7979d..8a151f2 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -1955,201 -1716,267 +1955,218 @@@ public class StorageProxy implements St } } - public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level) - throws UnavailableException, ReadFailureException, ReadTimeoutException + private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator { - Tracing.trace("Computing ranges to query"); - long startTime = System.nanoTime(); + private final ReadCallback handler; + private PartitionIterator result; - Keyspace keyspace = Keyspace.open(command.keyspace); - List<Row> rows; - // now scan until we have enough results - try + private SingleRangeResponse(ReadCallback handler) { - int liveRowCount = 0; - boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions(); - rows = new ArrayList<>(); + this.handler = handler; + } - // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be - // expensive in clusters with vnodes) - List<? extends AbstractBounds<RowPosition>> ranges; - if (keyspace.getReplicationStrategy() instanceof LocalStrategy) - ranges = command.keyRange.unwrap(); - else - ranges = getRestrictedRanges(command.keyRange); - - // determine the number of rows to be fetched and the concurrency factor - int rowsToBeFetched = command.limit(); - int concurrencyFactor; - if (command.requiresScanningAllRanges()) - { - // all nodes must be queried - rowsToBeFetched *= ranges.size(); - concurrencyFactor = ranges.size(); - logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}", - command.limit(), - ranges.size(), - concurrencyFactor); - Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}", - ranges.size(), concurrencyFactor); + private void waitForResponse() throws ReadTimeoutException + { + if (result != null) + return; + + try + { + result = handler.get(); } - else + catch (DigestMismatchException e) { - // our estimate of how many result rows there will be per-range - float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace); - // underestimate how many rows we will get per-range in order to increase the likelihood that we'll - // fetch enough rows in the first round - resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; - concurrencyFactor = resultRowsPerRange == 0.0 - ? 1 - : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange))); - - logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", - resultRowsPerRange, - command.limit(), - ranges.size(), - concurrencyFactor); - Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", - ranges.size(), - concurrencyFactor, - resultRowsPerRange); - } - - boolean haveSufficientRows = false; - int i = 0; - AbstractBounds<RowPosition> nextRange = null; - List<InetAddress> nextEndpoints = null; - List<InetAddress> nextFilteredEndpoints = null; - while (i < ranges.size()) - { - List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor); - int concurrentFetchStartingIndex = i; - int concurrentRequests = 0; - while ((i - concurrentFetchStartingIndex) < concurrencyFactor) - { - AbstractBounds<RowPosition> range = nextRange == null - ? ranges.get(i) - : nextRange; - List<InetAddress> liveEndpoints = nextEndpoints == null - ? getLiveSortedEndpoints(keyspace, range.right) - : nextEndpoints; - List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null - ? consistency_level.filterForQuery(keyspace, liveEndpoints) - : nextFilteredEndpoints; - ++i; - ++concurrentRequests; - - // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take - // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges - // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand. - while (i < ranges.size()) - { - nextRange = ranges.get(i); - nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right); - nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints); - - // If the current range right is the min token, we should stop merging because CFS.getRangeSlice - // don't know how to deal with a wrapping range. - // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps - // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking - // wire compatibility, so It's likely easier not to bother; - if (range.right.isMinimum()) - break; - - List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints); - - // Check if there is enough endpoint for the merge to be possible. - if (!consistency_level.isSufficientLiveNodes(keyspace, merged)) - break; - - List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged); - - // Estimate whether merging will be a win or not - if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints)) - break; - - // If we get there, merge this range and the next one - range = range.withNewRight(nextRange.right); - liveEndpoints = merged; - filteredEndpoints = filteredMerged; - ++i; - } - - AbstractRangeCommand nodeCmd = command.forSubRange(range); + throw new AssertionError(e); // no digests in range slices yet + } + } - // collect replies and resolve according to consistency level - RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp); - List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace))); - ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints); - handler.assureSufficientLiveNodes(); - resolver.setSources(filteredEndpoints); - if (filteredEndpoints.size() == 1 - && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())) - { - StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler)); - } - else - { - MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage(); - for (InetAddress endpoint : filteredEndpoints) - { - Tracing.trace("Enqueuing request to {}", endpoint); - MessagingService.instance().sendRRWithFailure(message, endpoint, handler); - } - } - scanHandlers.add(Pair.create(nodeCmd, handler)); - } - Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex); + protected RowIterator computeNext() + { + waitForResponse(); + return result.hasNext() ? result.next() : endOfData(); + } - List<AsyncOneResponse> repairResponses = new ArrayList<>(); - for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers) - { - ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right; - RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver; + public void close() + { + if (result != null) + result.close(); + } + } - try - { - for (Row row : handler.get()) - { - rows.add(row); - if (countLiveRows) - liveRowCount += row.getLiveCount(command.predicate, command.timestamp); - } - repairResponses.addAll(resolver.repairResults); - } - catch (ReadTimeoutException|ReadFailureException ex) - { - // we timed out or failed waiting for responses - int blockFor = consistency_level.blockFor(keyspace); - int responseCount = resolver.responses.size(); - String gotData = responseCount > 0 - ? resolver.isDataPresent() ? " (including data)" : " (only digests)" - : ""; - - boolean isTimeout = ex instanceof ReadTimeoutException; - if (Tracing.isTracing()) - { - Tracing.trace("{}; received {} of {} responses{} for range {} of {}", - (isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size()); - } - else if (logger.isDebugEnabled()) - { - logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}", - (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size()); - } - throw ex; - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); // no digests in range slices yet - } + private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator + { + private final Iterator<RangeForQuery> ranges; + private final int totalRangeCount; + private final PartitionRangeReadCommand command; + private final Keyspace keyspace; + private final ConsistencyLevel consistency; - // if we're done, great, otherwise, move to the next range - int count = countLiveRows ? liveRowCount : rows.size(); - if (count >= rowsToBeFetched) - { - haveSufficientRows = true; - break; - } - } + private final long startTime; + private DataLimits.Counter counter; + private PartitionIterator sentQueryIterator; - try - { - FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout()); - } - catch (TimeoutException ex) - { - // We got all responses, but timed out while repairing - int blockFor = consistency_level.blockFor(keyspace); - if (Tracing.isTracing()) - Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); - else - logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true); - } + private int concurrencyFactor; + // The two following "metric" are maintained to improve the concurrencyFactor + // when it was not good enough initially. + private int liveReturned; + private int rangesQueried; - if (haveSufficientRows) - return command.postReconciliationProcessing(rows); + public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency) + { + this.command = command; + this.concurrencyFactor = concurrencyFactor; + this.startTime = System.nanoTime(); + this.ranges = new RangeMerger(ranges, keyspace, consistency); + this.totalRangeCount = ranges.rangeCount(); + this.consistency = consistency; + this.keyspace = keyspace; + } - // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor - // based on the results we've seen so far (as long as we still have ranges left to query) - if (i < ranges.size()) + public RowIterator computeNext() + { - while (sentQueryIterator == null || !sentQueryIterator.hasNext()) ++ try + { - // If we don't have more range to handle, we're done - if (!ranges.hasNext()) - return endOfData(); - - // else, sends the next batch of concurrent queries (after having close the previous iterator) - if (sentQueryIterator != null) ++ while (sentQueryIterator == null || !sentQueryIterator.hasNext()) { - liveReturned += counter.counted(); - sentQueryIterator.close(); - float fetchedRows = countLiveRows ? liveRowCount : rows.size(); - float remainingRows = rowsToBeFetched - fetchedRows; - float actualRowsPerRange; - if (fetchedRows == 0.0) - { - // we haven't actually gotten any results, so query all remaining ranges at once - actualRowsPerRange = 0.0f; - concurrencyFactor = ranges.size() - i; - } - else ++ // If we don't have more range to handle, we're done ++ if (!ranges.hasNext()) ++ return endOfData(); + - // It's not the first batch of queries and we're not done, so we we can use what has been - // returned so far to improve our rows-per-range estimate and update the concurrency accordingly - updateConcurrencyFactor(); ++ // else, sends the next batch of concurrent queries (after having close the previous iterator) ++ if (sentQueryIterator != null) + { - actualRowsPerRange = fetchedRows / i; - concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange))); ++ liveReturned += counter.counted(); ++ sentQueryIterator.close(); ++ ++ // It's not the first batch of queries and we're not done, so we we can use what has been ++ // returned so far to improve our rows-per-range estimate and update the concurrency accordingly ++ updateConcurrencyFactor(); + } - logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", - actualRowsPerRange, (int) remainingRows, concurrencyFactor); ++ sentQueryIterator = sendNextRequests(); } - sentQueryIterator = sendNextRequests(); - } + - return sentQueryIterator.next(); ++ return sentQueryIterator.next(); ++ } ++ catch (UnavailableException e) ++ { ++ rangeMetrics.unavailables.mark(); ++ throw e; ++ } ++ catch (ReadTimeoutException e) ++ { ++ rangeMetrics.timeouts.mark(); ++ throw e; ++ } ++ catch (ReadFailureException e) ++ { ++ rangeMetrics.failures.mark(); ++ throw e; + } } - catch (ReadTimeoutException e) + + private void updateConcurrencyFactor() { - rangeMetrics.timeouts.mark(); - throw e; + if (liveReturned == 0) + { + // we haven't actually gotten any results, so query all remaining ranges at once + concurrencyFactor = totalRangeCount - rangesQueried; + return; + } + + // Otherwise, compute how many rows per range we got on average and pick a concurrency factor + // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries. + int remainingRows = command.limits().count() - liveReturned; + float rowsPerRange = (float)liveReturned / (float)rangesQueried; + concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange))); + logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", + rowsPerRange, (int) remainingRows, concurrencyFactor); } - catch (UnavailableException e) + + private SingleRangeResponse query(RangeForQuery toQuery) { - rangeMetrics.unavailables.mark(); - throw e; + PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range); + + DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size()); + + int blockFor = consistency.blockFor(keyspace); + int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor); + List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses); + ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints); + + handler.assureSufficientLiveNodes(); + + if (toQuery.filteredEndpoints.size() == 1 && canDoLocalRequest(toQuery.filteredEndpoints.get(0))) + { + StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler)); + } + else + { + for (InetAddress endpoint : toQuery.filteredEndpoints) + { + MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint)); + Tracing.trace("Enqueuing request to {}", endpoint); + MessagingService.instance().sendRRWithFailure(message, endpoint, handler); + } + } + + return new SingleRangeResponse(handler); } - catch (ReadFailureException e) + + private PartitionIterator sendNextRequests() { - rangeMetrics.failures.mark(); - throw e; + List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor); + for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++) + { + concurrentQueries.add(query(ranges.next())); + ++rangesQueried; + } + + Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size()); + // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to + // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE. + counter = DataLimits.NONE.newCounter(command.nowInSec(), true); + return counter.applyTo(PartitionIterators.concat(concurrentQueries)); } - finally + + public void close() { - long latency = System.nanoTime() - startTime; - rangeMetrics.addNano(latency); - Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); + try + { + if (sentQueryIterator != null) + sentQueryIterator.close(); + } + finally + { + long latency = System.nanoTime() - startTime; + rangeMetrics.addNano(latency); + Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); + } } - return command.postReconciliationProcessing(rows); + } + + @SuppressWarnings("resource") + public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel) - throws UnavailableException, ReadFailureException, ReadTimeoutException + { + Tracing.trace("Computing ranges to query"); + + Keyspace keyspace = Keyspace.open(command.metadata().ksName); + RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel); + + // our estimate of how many result rows there will be per-range + float resultsPerRange = estimateResultsPerRange(command, keyspace); + // underestimate how many rows we will get per-range in order to increase the likelihood that we'll + // fetch enough rows in the first round + resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; + int concurrencyFactor = resultsPerRange == 0.0 + ? 1 + : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange))); + logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", + resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor); + Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange); + + // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally. + + return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec()); } public Map<String, List<String>> getSchemaVersions()