http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/service/DigestResolver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/DigestResolver.java index 572df6f,0000000..62b4538 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/DigestResolver.java @@@ -1,98 -1,0 +1,98 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.net.MessageIn; + +public class DigestResolver extends ResponseResolver +{ + private volatile ReadResponse dataResponse; + + public DigestResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) + { + super(keyspace, command, consistency, maxResponseCount); + } + + @Override + public void preprocess(MessageIn<ReadResponse> message) + { + super.preprocess(message); + if (dataResponse == null && !message.payload.isDigestResponse()) + dataResponse = message.payload; + } + + /** + * Special case of resolve() so that CL.ONE reads never throw DigestMismatchException in the foreground + */ + public PartitionIterator getData() + { + assert isDataPresent(); + return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec()); + } + + /* + * This method handles two different scenarios: + * + * a) we're handling the initial read of data from the closest replica + digests + * from the rest. In this case we check the digests against each other, + * throw an exception if there is a mismatch, otherwise return the data row. + * + * b) we're checking additional digests that arrived after the minimum to handle + * the requested ConsistencyLevel, i.e. asynchronous read repair check + */ + public PartitionIterator resolve() throws DigestMismatchException + { + if (responses.size() == 1) + return getData(); + - if (logger.isDebugEnabled()) - logger.debug("resolving {} responses", responses.size()); ++ if (logger.isTraceEnabled()) ++ logger.trace("resolving {} responses", responses.size()); + + long start = System.nanoTime(); + + // validate digests against each other; throw immediately on mismatch. + ByteBuffer digest = null; + for (MessageIn<ReadResponse> message : responses) + { + ReadResponse response = message.payload; + + ByteBuffer newDigest = response.digest(command.metadata(), command); + if (digest == null) + digest = newDigest; + else if (!digest.equals(newDigest)) + // rely on the fact that only single partition queries use digests + throw new DigestMismatchException(((SinglePartitionReadCommand)command).partitionKey(), digest, newDigest); + } + - if (logger.isDebugEnabled()) - logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); ++ if (logger.isTraceEnabled()) ++ logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + + return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec()); + } + + public boolean isDataPresent() + { + return dataResponse != null; + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/service/GCInspector.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ReadCallback.java index 8b1ef32,145679d..8747004 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@@ -108,46 -102,31 +108,46 @@@ public class ReadCallback implements IA } } - public TResolved get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException + public void awaitResults() throws ReadFailureException, ReadTimeoutException { - if (!await(command.getTimeout(), TimeUnit.MILLISECONDS)) + boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS); + boolean failed = blockfor + failures > endpoints.size(); + if (signaled && !failed) + return; + + if (Tracing.isTracing()) { - // Same as for writes, see AbstractWriteResponseHandler - ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent()); - Tracing.trace("Read timeout: {}", ex.toString()); - if (logger.isTraceEnabled()) - logger.trace("Read timeout: {}", ex.toString()); - throw ex; + String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; + Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData }); } - - if (blockfor + failures > endpoints.size()) + else if (logger.isDebugEnabled()) { - ReadFailureException ex = new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent()); - - if (logger.isTraceEnabled()) - logger.trace("Read failure: {}", ex.toString()); - throw ex; + String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; + logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData }); } - return blockfor == 1 ? resolver.getData() : resolver.resolve(); + // Same as for writes, see AbstractWriteResponseHandler + throw failed + ? new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent()) + : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent()); } - public void response(MessageIn<TMessage> message) + public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException + { + awaitResults(); + + PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve(); - if (logger.isDebugEnabled()) - logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); ++ if (logger.isTraceEnabled()) ++ logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + return result; + } + + public int blockFor() + { + return blockfor; + } + + public void response(MessageIn<ReadResponse> message) { resolver.preprocess(message); int n = waitingFor(message.from) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index d209af6,af56c3a..5c94f08 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -819,86 -703,59 +819,86 @@@ public class StorageProxy implements St } } - private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid) + public static boolean canDoLocalRequest(InetAddress replica) + { + return replica.equals(FBUtilities.getBroadcastAddress()); + } + + private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid) + throws WriteTimeoutException, WriteFailureException + { + WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all, + Collections.<InetAddress>emptyList(), + endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO, + Keyspace.open(SystemKeyspace.NAME), + null, + WriteType.BATCH_LOG); + + Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations); + + if (!endpoints.current.isEmpty()) + syncWriteToBatchlog(handler, batch, endpoints.current); + + if (!endpoints.legacy.isEmpty()) + LegacyBatchlogMigrator.syncWriteToBatchlog(handler, batch, endpoints.legacy); + + handler.get(); + } + + private static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints) throws WriteTimeoutException, WriteFailureException { - AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints, - Collections.<InetAddress>emptyList(), - ConsistencyLevel.ONE, - Keyspace.open(SystemKeyspace.NAME), - null, - WriteType.BATCH_LOG); + MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer); - MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version) - .createMessage(); for (InetAddress target : endpoints) { - logger.debug("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size()); - int targetVersion = MessagingService.instance().getVersion(target); - if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) - { - insertLocal(message.payload, handler); - } - else if (targetVersion == MessagingService.current_version) - { - MessagingService.instance().sendRR(message, target, handler, false); - } ++ logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size()); + + if (canDoLocalRequest(target)) + performLocally(Stage.MUTATION, () -> BatchlogManager.store(batch), handler); else - { - MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion) - .createMessage(), - target, - handler, - false); - } + MessagingService.instance().sendRR(message, target, handler); } + } - handler.get(); + private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid) + { + if (!endpoints.current.isEmpty()) + asyncRemoveFromBatchlog(endpoints.current, uuid); + + if (!endpoints.legacy.isEmpty()) + LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid); } private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid) { - AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints, - Collections.<InetAddress>emptyList(), - ConsistencyLevel.ANY, - Keyspace.open(SystemKeyspace.NAME), - null, - WriteType.SIMPLE); - Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid)); - mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros()); - MessageOut<Mutation> message = mutation.createMessage(); + MessageOut<UUID> message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer); for (InetAddress target : endpoints) { - if (logger.isDebugEnabled()) - logger.debug("Sending batchlog remove request {} to {}", uuid, target); - if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS) - insertLocal(message.payload, handler); ++ if (logger.isTraceEnabled()) ++ logger.trace("Sending batchlog remove request {} to {}", uuid, target); + + if (canDoLocalRequest(target)) + performLocally(Stage.MUTATION, () -> BatchlogManager.remove(uuid)); else - MessagingService.instance().sendRR(message, target, handler, false); + MessagingService.instance().sendOneWay(message, target); + } + } + + private static void asyncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage) + { + for (WriteResponseHandlerWrapper wrapper : wrappers) + { + Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); + + try + { + sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage); + } + catch (OverloadedException | WriteTimeoutException e) + { + wrapper.handler.onFailure(FBUtilities.getBroadcastAddress()); + } } } @@@ -1522,144 -1347,218 +1522,144 @@@ * 4. If the digests (if any) match the data return the data * 5. else carry out read repair by getting data from all the nodes. */ - private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel) + private static PartitionIterator fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel consistencyLevel) throws UnavailableException, ReadFailureException, ReadTimeoutException { - List<Row> rows = new ArrayList<>(initialCommands.size()); - // (avoid allocating a new list in the common case of nothing-to-retry) - List<ReadCommand> commandsToRetry = Collections.emptyList(); + int cmdCount = commands.size(); - do - { - List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry; - AbstractReadExecutor[] readExecutors = new AbstractReadExecutor[commands.size()]; + SinglePartitionReadLifecycle[] reads = new SinglePartitionReadLifecycle[cmdCount]; + for (int i = 0; i < cmdCount; i++) + reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel); - if (!commandsToRetry.isEmpty()) - Tracing.trace("Retrying {} commands", commandsToRetry.size()); + for (int i = 0; i < cmdCount; i++) + reads[i].doInitialQueries(); - // send out read requests - for (int i = 0; i < commands.size(); i++) - { - ReadCommand command = commands.get(i); - assert !command.isDigestQuery(); + for (int i = 0; i < cmdCount; i++) + reads[i].maybeTryAdditionalReplicas(); - AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel); - exec.executeAsync(); - readExecutors[i] = exec; - } + for (int i = 0; i < cmdCount; i++) + reads[i].awaitResultsAndRetryOnDigestMismatch(); - for (AbstractReadExecutor exec : readExecutors) - exec.maybeTryAdditionalReplicas(); + for (int i = 0; i < cmdCount; i++) + if (!reads[i].isDone()) + reads[i].maybeAwaitFullDataRead(); - // read results and make a second pass for any digest mismatches - List<ReadCommand> repairCommands = null; - List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null; - for (AbstractReadExecutor exec: readExecutors) - { - try - { - Row row = exec.get(); - if (row != null) - { - row = exec.command.maybeTrim(row); - rows.add(row); - } + List<PartitionIterator> results = new ArrayList<>(cmdCount); + for (int i = 0; i < cmdCount; i++) + { + assert reads[i].isDone(); + results.add(reads[i].getResult()); + } - if (logger.isTraceEnabled()) - logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start)); - } - catch (ReadTimeoutException|ReadFailureException ex) - { - int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace())); - int responseCount = exec.handler.getReceivedCount(); - String gotData = responseCount > 0 - ? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)" - : ""; - - boolean isTimeout = ex instanceof ReadTimeoutException; - if (Tracing.isTracing()) - { - Tracing.trace("{}; received {} of {} responses{}", - isTimeout ? "Timed out" : "Failed", responseCount, blockFor, gotData); - } - else if (logger.isDebugEnabled()) - { - logger.debug("Read {}; received {} of {} responses{}", (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData); - } - throw ex; - } - catch (DigestMismatchException ex) - { - Tracing.trace("Digest mismatch: {}", ex); + return PartitionIterators.concat(results); + } - ReadRepairMetrics.repairedBlocking.mark(); + private static class SinglePartitionReadLifecycle + { + private final SinglePartitionReadCommand<?> command; + private final AbstractReadExecutor executor; + private final ConsistencyLevel consistency; - // Do a full data read to resolve the correct response (and repair node that need be) - RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp, exec.handler.endpoints.size()); - ReadCallback<ReadResponse, Row> repairHandler = new ReadCallback<>(resolver, - ConsistencyLevel.ALL, - exec.getContactedReplicas().size(), - exec.command, - Keyspace.open(exec.command.getKeyspace()), - exec.handler.endpoints); + private PartitionIterator result; + private ReadCallback repairHandler; - if (repairCommands == null) - { - repairCommands = new ArrayList<>(); - repairResponseHandlers = new ArrayList<>(); - } - repairCommands.add(exec.command); - repairResponseHandlers.add(repairHandler); + SinglePartitionReadLifecycle(SinglePartitionReadCommand<?> command, ConsistencyLevel consistency) + { + this.command = command; + this.executor = AbstractReadExecutor.getReadExecutor(command, consistency); + this.consistency = consistency; + } - MessageOut<ReadCommand> message = exec.command.createMessage(); - for (InetAddress endpoint : exec.getContactedReplicas()) - { - Tracing.trace("Enqueuing full data read to {}", endpoint); - MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler); - } - } - } + boolean isDone() + { + return result != null; + } - commandsToRetry.clear(); + void doInitialQueries() + { + executor.executeAsync(); + } - // read the results for the digest mismatch retries - if (repairResponseHandlers != null) - { - for (int i = 0; i < repairCommands.size(); i++) - { - ReadCommand command = repairCommands.get(i); - ReadCallback<ReadResponse, Row> handler = repairResponseHandlers.get(i); + void maybeTryAdditionalReplicas() + { + executor.maybeTryAdditionalReplicas(); + } - Row row; - try - { - row = handler.get(); - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); // full data requested from each node here, no digests should be sent - } - catch (ReadTimeoutException e) - { - if (Tracing.isTracing()) - Tracing.trace("Timed out waiting on digest mismatch repair requests"); - else - logger.trace("Timed out waiting on digest mismatch repair requests"); - // the caught exception here will have CL.ALL from the repair command, - // not whatever CL the initial command was at (CASSANDRA-7947) - int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace())); - throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true); - } + void awaitResultsAndRetryOnDigestMismatch() throws ReadFailureException, ReadTimeoutException + { + try + { + result = executor.get(); + } + catch (DigestMismatchException ex) + { + Tracing.trace("Digest mismatch: {}", ex); - RowDataResolver resolver = (RowDataResolver)handler.resolver; - try - { - // wait for the repair writes to be acknowledged, to minimize impact on any replica that's - // behind on writes in case the out-of-sync row is read multiple times in quick succession - FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); - } - catch (TimeoutException e) - { - if (Tracing.isTracing()) - Tracing.trace("Timed out waiting on digest mismatch repair acknowledgements"); - else - logger.trace("Timed out waiting on digest mismatch repair acknowledgements"); - int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace())); - throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true); - } + ReadRepairMetrics.repairedBlocking.mark(); - // retry any potential short reads - ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row); - if (retryCommand != null) - { - Tracing.trace("Issuing retry for read command"); - if (commandsToRetry == Collections.EMPTY_LIST) - commandsToRetry = new ArrayList<>(); - commandsToRetry.add(retryCommand); - continue; - } + // Do a full data read to resolve the correct response (and repair node that need be) + Keyspace keyspace = Keyspace.open(command.metadata().ksName); + DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size()); + repairHandler = new ReadCallback(resolver, + ConsistencyLevel.ALL, + executor.getContactedReplicas().size(), + command, + keyspace, + executor.handler.endpoints); - if (row != null) - { - row = command.maybeTrim(row); - rows.add(row); - } + for (InetAddress endpoint : executor.getContactedReplicas()) + { + MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint)); + Tracing.trace("Enqueuing full data read to {}", endpoint); + MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler); } } - } while (!commandsToRetry.isEmpty()); - - return rows; - } - - static class LocalReadRunnable extends DroppableRunnable - { - private final ReadCommand command; - private final ReadCallback<ReadResponse, Row> handler; - private final long start = System.nanoTime(); - - LocalReadRunnable(ReadCommand command, ReadCallback<ReadResponse, Row> handler) - { - super(MessagingService.Verb.READ); - this.command = command; - this.handler = handler; } - protected void runMayThrow() + void maybeAwaitFullDataRead() throws ReadTimeoutException { + // There wasn't a digest mismatch, we're good + if (repairHandler == null) + return; + + // Otherwise, get the result from the full-data read and check that it's not a short read try { - Keyspace keyspace = Keyspace.open(command.ksName); - Row r = command.getRow(keyspace); - ReadResponse result = ReadVerbHandler.getResponse(command, r); - MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - handler.response(result); + result = repairHandler.get(); } - catch (Throwable t) + catch (DigestMismatchException e) { - handler.onFailure(FBUtilities.getBroadcastAddress()); - if (t instanceof TombstoneOverwhelmingException) - logger.error(t.getMessage()); + throw new AssertionError(e); // full data requested from each node here, no digests should be sent + } + catch (ReadTimeoutException e) + { + if (Tracing.isTracing()) + Tracing.trace("Timed out waiting on digest mismatch repair requests"); else - logger.debug("Timed out waiting on digest mismatch repair requests"); - throw t; ++ logger.trace("Timed out waiting on digest mismatch repair requests"); + // the caught exception here will have CL.ALL from the repair command, + // not whatever CL the initial command was at (CASSANDRA-7947) + int blockFor = consistency.blockFor(Keyspace.open(command.metadata().ksName)); + throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true); } } + + PartitionIterator getResult() + { + assert result != null; + return result; + } } - static class LocalRangeSliceRunnable extends DroppableRunnable + static class LocalReadRunnable extends DroppableRunnable { - private final AbstractRangeCommand command; - private final ReadCallback<RangeSliceReply, Iterable<Row>> handler; + private final ReadCommand command; + private final ReadCallback handler; private final long start = System.nanoTime(); - LocalRangeSliceRunnable(AbstractRangeCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler) + LocalReadRunnable(ReadCommand command, ReadCallback handler) { - super(MessagingService.Verb.RANGE_SLICE); + super(MessagingService.Verb.READ); this.command = command; this.handler = handler; } @@@ -1832,199 -1661,253 +1832,199 @@@ } } - public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level) - throws UnavailableException, ReadFailureException, ReadTimeoutException + private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator { - Tracing.trace("Computing ranges to query"); - long startTime = System.nanoTime(); + private final ReadCallback handler; + private PartitionIterator result; - Keyspace keyspace = Keyspace.open(command.keyspace); - List<Row> rows; - // now scan until we have enough results - try + private SingleRangeResponse(ReadCallback handler) { - int liveRowCount = 0; - boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions(); - rows = new ArrayList<>(); + this.handler = handler; + } - // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be - // expensive in clusters with vnodes) - List<? extends AbstractBounds<RowPosition>> ranges; - if (keyspace.getReplicationStrategy() instanceof LocalStrategy) - ranges = command.keyRange.unwrap(); - else - ranges = getRestrictedRanges(command.keyRange); + private void waitForResponse() throws ReadTimeoutException + { + if (result != null) + return; - // determine the number of rows to be fetched and the concurrency factor - int rowsToBeFetched = command.limit(); - int concurrencyFactor; - if (command.requiresScanningAllRanges()) + try { - // all nodes must be queried - rowsToBeFetched *= ranges.size(); - concurrencyFactor = ranges.size(); - logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}", - command.limit(), - ranges.size(), - concurrencyFactor); - Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}", - ranges.size(), concurrencyFactor); + result = handler.get(); } - else + catch (DigestMismatchException e) { - // our estimate of how many result rows there will be per-range - float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace); - // underestimate how many rows we will get per-range in order to increase the likelihood that we'll - // fetch enough rows in the first round - resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; - concurrencyFactor = resultRowsPerRange == 0.0 - ? 1 - : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange))); - - logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", - resultRowsPerRange, - command.limit(), - ranges.size(), - concurrencyFactor); - Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", - ranges.size(), - concurrencyFactor, - resultRowsPerRange); + throw new AssertionError(e); // no digests in range slices yet } + } + + protected RowIterator computeNext() + { + waitForResponse(); + return result.hasNext() ? result.next() : endOfData(); + } + + public void close() + { + if (result != null) + result.close(); + } + } + + private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator + { + private final Iterator<RangeForQuery> ranges; + private final int totalRangeCount; + private final PartitionRangeReadCommand command; + private final Keyspace keyspace; + private final ConsistencyLevel consistency; + + private final long startTime; + private CountingPartitionIterator sentQueryIterator; + + private int concurrencyFactor; + // The two following "metric" are maintained to improve the concurrencyFactor + // when it was not good enough initially. + private int liveReturned; + private int rangesQueried; + + public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency) + { + this.command = command; + this.concurrencyFactor = concurrencyFactor; + this.startTime = System.nanoTime(); + this.ranges = new RangeMerger(ranges, keyspace, consistency); + this.totalRangeCount = ranges.rangeCount(); + this.consistency = consistency; + this.keyspace = keyspace; + } - boolean haveSufficientRows = false; - int i = 0; - AbstractBounds<RowPosition> nextRange = null; - List<InetAddress> nextEndpoints = null; - List<InetAddress> nextFilteredEndpoints = null; - while (i < ranges.size()) + public RowIterator computeNext() + { + while (sentQueryIterator == null || !sentQueryIterator.hasNext()) { - List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor); - int concurrentFetchStartingIndex = i; - int concurrentRequests = 0; - while ((i - concurrentFetchStartingIndex) < concurrencyFactor) + // If we don't have more range to handle, we're done + if (!ranges.hasNext()) + return endOfData(); + + // else, sends the next batch of concurrent queries (after having close the previous iterator) + if (sentQueryIterator != null) { - AbstractBounds<RowPosition> range = nextRange == null - ? ranges.get(i) - : nextRange; - List<InetAddress> liveEndpoints = nextEndpoints == null - ? getLiveSortedEndpoints(keyspace, range.right) - : nextEndpoints; - List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null - ? consistency_level.filterForQuery(keyspace, liveEndpoints) - : nextFilteredEndpoints; - ++i; - ++concurrentRequests; - - // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take - // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges - // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand. - while (i < ranges.size()) - { - nextRange = ranges.get(i); - nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right); - nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints); - - // If the current range right is the min token, we should stop merging because CFS.getRangeSlice - // don't know how to deal with a wrapping range. - // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps - // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking - // wire compatibility, so It's likely easier not to bother; - if (range.right.isMinimum()) - break; - - List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints); - - // Check if there is enough endpoint for the merge to be possible. - if (!consistency_level.isSufficientLiveNodes(keyspace, merged)) - break; - - List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged); - - // Estimate whether merging will be a win or not - if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints)) - break; - - // If we get there, merge this range and the next one - range = range.withNewRight(nextRange.right); - liveEndpoints = merged; - filteredEndpoints = filteredMerged; - ++i; - } + liveReturned += sentQueryIterator.counter().counted(); + sentQueryIterator.close(); - AbstractRangeCommand nodeCmd = command.forSubRange(range); - - // collect replies and resolve according to consistency level - RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp); - List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace))); - ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints); - handler.assureSufficientLiveNodes(); - resolver.setSources(filteredEndpoints); - if (filteredEndpoints.size() == 1 - && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()) - && OPTIMIZE_LOCAL_REQUESTS) - { - StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get()); - } - else - { - MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage(); - for (InetAddress endpoint : filteredEndpoints) - { - Tracing.trace("Enqueuing request to {}", endpoint); - MessagingService.instance().sendRRWithFailure(message, endpoint, handler); - } - } - scanHandlers.add(Pair.create(nodeCmd, handler)); + // It's not the first batch of queries and we're not done, so we we can use what has been + // returned so far to improve our rows-per-range estimate and update the concurrency accordingly + updateConcurrencyFactor(); } - Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex); + sentQueryIterator = sendNextRequests(); + } - List<AsyncOneResponse> repairResponses = new ArrayList<>(); - for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers) - { - ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right; - RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver; + return sentQueryIterator.next(); + } - try - { - for (Row row : handler.get()) - { - rows.add(row); - if (countLiveRows) - liveRowCount += row.getLiveCount(command.predicate, command.timestamp); - } - repairResponses.addAll(resolver.repairResults); - } - catch (ReadTimeoutException|ReadFailureException ex) - { - // we timed out or failed waiting for responses - int blockFor = consistency_level.blockFor(keyspace); - int responseCount = resolver.responses.size(); - String gotData = responseCount > 0 - ? resolver.isDataPresent() ? " (including data)" : " (only digests)" - : ""; - - boolean isTimeout = ex instanceof ReadTimeoutException; - if (Tracing.isTracing()) - { - Tracing.trace("{}; received {} of {} responses{} for range {} of {}", - (isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size()); - } - else if (logger.isDebugEnabled()) - { - logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}", - (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size()); - } - throw ex; - } - catch (DigestMismatchException e) - { - throw new AssertionError(e); // no digests in range slices yet - } + private void updateConcurrencyFactor() + { + if (liveReturned == 0) + { + // we haven't actually gotten any results, so query all remaining ranges at once + concurrencyFactor = totalRangeCount - rangesQueried; + return; + } - // if we're done, great, otherwise, move to the next range - int count = countLiveRows ? liveRowCount : rows.size(); - if (count >= rowsToBeFetched) - { - haveSufficientRows = true; - break; - } - } + // Otherwise, compute how many rows per range we got on average and pick a concurrency factor + // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries. + int remainingRows = command.limits().count() - liveReturned; + float rowsPerRange = (float)liveReturned / (float)rangesQueried; + concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange))); - logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", ++ logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", + rowsPerRange, (int) remainingRows, concurrencyFactor); + } - try - { - FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout()); - } - catch (TimeoutException ex) - { - // We got all responses, but timed out while repairing - int blockFor = consistency_level.blockFor(keyspace); - if (Tracing.isTracing()) - Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); - else - logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true); - } + private SingleRangeResponse query(RangeForQuery toQuery) + { + PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range); + + DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size()); + + int blockFor = consistency.blockFor(keyspace); + int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor); + List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses); + ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints); - if (haveSufficientRows) - return command.postReconciliationProcessing(rows); + handler.assureSufficientLiveNodes(); - // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor - // based on the results we've seen so far (as long as we still have ranges left to query) - if (i < ranges.size()) + if (toQuery.filteredEndpoints.size() == 1 && canDoLocalRequest(toQuery.filteredEndpoints.get(0))) + { + StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler), Tracing.instance.get()); + } + else + { + for (InetAddress endpoint : toQuery.filteredEndpoints) { - float fetchedRows = countLiveRows ? liveRowCount : rows.size(); - float remainingRows = rowsToBeFetched - fetchedRows; - float actualRowsPerRange; - if (fetchedRows == 0.0) - { - // we haven't actually gotten any results, so query all remaining ranges at once - actualRowsPerRange = 0.0f; - concurrencyFactor = ranges.size() - i; - } - else - { - actualRowsPerRange = fetchedRows / i; - concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange))); - } - logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", - actualRowsPerRange, (int) remainingRows, concurrencyFactor); + MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint)); + Tracing.trace("Enqueuing request to {}", endpoint); + MessagingService.instance().sendRRWithFailure(message, endpoint, handler); } } + + return new SingleRangeResponse(handler); } - finally + + private CountingPartitionIterator sendNextRequests() + { + List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor); + for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++) + { + concurrentQueries.add(query(ranges.next())); + ++rangesQueried; + } + + Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size()); + // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to + // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE. + return new CountingPartitionIterator(PartitionIterators.concat(concurrentQueries), DataLimits.NONE, command.nowInSec()); + } + + public void close() { - long latency = System.nanoTime() - startTime; - rangeMetrics.addNano(latency); - Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); + try + { + if (sentQueryIterator != null) + sentQueryIterator.close(); + } + finally + { + long latency = System.nanoTime() - startTime; + rangeMetrics.addNano(latency); + Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS); + } } - return command.postReconciliationProcessing(rows); + } + + @SuppressWarnings("resource") + public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel) + throws UnavailableException, ReadFailureException, ReadTimeoutException + { + Tracing.trace("Computing ranges to query"); + + Keyspace keyspace = Keyspace.open(command.metadata().ksName); + RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel); + + // our estimate of how many result rows there will be per-range + float resultsPerRange = estimateResultsPerRange(command, keyspace); + // underestimate how many rows we will get per-range in order to increase the likelihood that we'll + // fetch enough rows in the first round + resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN; + int concurrencyFactor = resultsPerRange == 0.0 + ? 1 + : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange))); - logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", ++ logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", + resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor); + Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange); + + // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally. + + return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec()); } public Map<String, List<String>> getSchemaVersions() @@@ -2430,52 -2287,6 +2430,52 @@@ logger.warn("Some hints were not written before shutdown. This is not supposed to happen. You should (a) run repair, and (b) file a bug report"); } + private static AtomicInteger getHintsInProgressFor(InetAddress destination) + { + try + { + return hintsInProgress.load(destination); + } + catch (Exception e) + { + throw new AssertionError(e); + } + } + + public static Future<Void> submitHint(Mutation mutation, InetAddress target, AbstractWriteResponseHandler<IMutation> responseHandler) + { + return submitHint(mutation, Collections.singleton(target), responseHandler); + } + + public static Future<Void> submitHint(Mutation mutation, + Collection<InetAddress> targets, + AbstractWriteResponseHandler<IMutation> responseHandler) + { + HintRunnable runnable = new HintRunnable(targets) + { + public void runMayThrow() + { - logger.debug("Adding hints for {}", targets); ++ logger.trace("Adding hints for {}", targets); + HintsService.instance.write(Iterables.transform(targets, StorageService.instance::getHostIdForEndpoint), + Hint.create(mutation, System.currentTimeMillis())); + targets.forEach(HintsService.instance.metrics::incrCreatedHints); + // Notify the handler only for CL == ANY + if (responseHandler != null && responseHandler.consistencyLevel == ConsistencyLevel.ANY) + responseHandler.response(null); + } + }; + + return submitHint(runnable); + } + + private static Future<Void> submitHint(HintRunnable runnable) + { + StorageMetrics.totalHintsInProgress.inc(runnable.targets.size()); + for (InetAddress target : runnable.targets) + getHintsInProgressFor(target).incrementAndGet(); + return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable); + } + public Long getRpcTimeout() { return DatabaseDescriptor.getRpcTimeout(); } public void setRpcTimeout(Long timeoutInMillis) { DatabaseDescriptor.setRpcTimeout(timeoutInMillis); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/thrift/CassandraServer.java index 7017bc1,1abc928..538d128 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@@ -688,10 -538,10 +688,10 @@@ public class CassandraServer implement // request by page if this is a large row if (cfs.getMeanColumns() > 0) { - int averageColumnSize = (int) (cfs.metric.meanRowSize.getValue() / cfs.getMeanColumns()); + int averageColumnSize = (int) (cfs.metric.meanPartitionSize.getValue() / cfs.getMeanColumns()); pageSize = Math.min(COUNT_PAGE_SIZE, 4 * 1024 * 1024 / averageColumnSize); pageSize = Math.max(2, pageSize); - logger.debug("average row column size is {}; using pageSize of {}", averageColumnSize, pageSize); + logger.trace("average row column size is {}; using pageSize of {}", averageColumnSize, pageSize); } else { @@@ -1866,9 -1492,10 +1866,9 @@@ requestScheduler.release(); } - public String system_add_column_family(CfDef cf_def) - throws InvalidRequestException, SchemaDisagreementException, TException + public String system_add_column_family(CfDef cf_def) throws TException { - logger.debug("add_column_family"); + logger.trace("add_column_family"); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/thrift/ThriftValidation.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/utils/CLibrary.java ----------------------------------------------------------------------