Refactor read executor and response resolver, abstract read repair Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-14058
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39807ba4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39807ba4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39807ba4 Branch: refs/heads/trunk Commit: 39807ba48ed2e02223014fbf47dce21d4124b380 Parents: 4a50f44 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Tue Nov 14 17:38:10 2017 -0800 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Thu Mar 1 17:52:54 2018 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ConsistencyLevel.java | 2 +- .../UnfilteredPartitionIterators.java | 10 + .../db/rows/UnfilteredRowIterators.java | 11 + .../apache/cassandra/net/AsyncOneResponse.java | 71 +- .../cassandra/service/AbstractReadExecutor.java | 412 ------- .../cassandra/service/AsyncRepairCallback.java | 60 - .../apache/cassandra/service/DataResolver.java | 838 ------------- .../service/DigestMismatchException.java | 35 - .../cassandra/service/DigestResolver.java | 103 -- .../apache/cassandra/service/ReadCallback.java | 266 ----- .../cassandra/service/ReadRepairDecision.java | 23 - .../cassandra/service/ResponseResolver.java | 73 -- .../apache/cassandra/service/StorageProxy.java | 148 +-- .../service/reads/AbstractReadExecutor.java | 482 ++++++++ .../service/reads/AsyncRepairCallback.java | 61 + .../cassandra/service/reads/DataResolver.java | 133 +++ .../cassandra/service/reads/DigestResolver.java | 93 ++ .../cassandra/service/reads/ReadCallback.java | 213 ++++ .../service/reads/ReadRepairDecision.java | 23 + .../service/reads/ResponseResolver.java | 66 ++ .../reads/ShortReadPartitionsProtection.java | 187 +++ .../service/reads/ShortReadProtection.java | 74 ++ .../service/reads/ShortReadRowsProtection.java | 197 +++ .../reads/repair/BlockingReadRepair.java | 234 ++++ .../service/reads/repair/NoopReadRepair.java | 63 + .../repair/PartitionIteratorMergeListener.java | 92 ++ .../service/reads/repair/ReadRepair.java | 72 ++ .../service/reads/repair/RepairListener.java | 34 + .../reads/repair/RowIteratorMergeListener.java | 336 ++++++ .../cassandra/service/DataResolverTest.java | 1122 ------------------ .../cassandra/service/ReadExecutorTest.java | 215 ---- .../service/reads/DataResolverTest.java | 1057 +++++++++++++++++ .../service/reads/ReadExecutorTest.java | 216 ++++ .../reads/repair/TestableReadRepair.java | 109 ++ 35 files changed, 3826 insertions(+), 3306 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9e7a599..a6c0323 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Refactor read executor and response resolver, abstract read repair (CASSANDRA-14058) * Add optional startup delay to wait until peers are ready (CASSANDRA-13993) * Add a few options to nodetool verify (CASSANDRA-14201) * CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183) http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index f93e737..19f2e10 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.service.ReadRepairDecision; +import org.apache.cassandra.service.reads.ReadRepairDecision; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.locator.AbstractReplicationStrategy; http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index dff6dae..edb7833 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -49,6 +49,16 @@ public abstract class UnfilteredPartitionIterators { public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions); public void close(); + + public static MergeListener NOOP = new MergeListener() + { + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + return UnfilteredRowIterators.MergeListener.NOOP; + } + + public void close() {} + }; } @SuppressWarnings("resource") // The created resources are returned right away http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index e1c6685..0244531 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -95,6 +95,17 @@ public abstract class UnfilteredRowIterators public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions); public void close(); + + public static MergeListener NOOP = new MergeListener() + { + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) {} + + public void onMergedRows(Row merged, Row[] versions) {} + + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) {} + + public void close() {} + }; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/net/AsyncOneResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/AsyncOneResponse.java b/src/java/org/apache/cassandra/net/AsyncOneResponse.java index b7ef227..4e004d2 100644 --- a/src/java/org/apache/cassandra/net/AsyncOneResponse.java +++ b/src/java/org/apache/cassandra/net/AsyncOneResponse.java @@ -17,67 +17,54 @@ */ package org.apache.cassandra.net; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.AbstractFuture; + /** * A callback specialized for returning a value from a single target; that is, this is for messages * that we only send to one recipient. */ -public class AsyncOneResponse<T> implements IAsyncCallback<T> +public class AsyncOneResponse<T> extends AbstractFuture<T> implements IAsyncCallback<T> { - private T result; - private boolean done; private final long start = System.nanoTime(); - public T get(long timeout, TimeUnit tu) throws TimeoutException + public void response(MessageIn<T> response) { - timeout = tu.toNanos(timeout); - boolean interrupted = false; - try + set(response.payload); + } + + public boolean isLatencyForSnitch() + { + return false; + } + + @Override + public T get(long timeout, TimeUnit unit) throws TimeoutException + { + long adjustedTimeout = unit.toNanos(timeout) - (System.nanoTime() - start); + if (adjustedTimeout <= 0) { - synchronized (this) - { - while (!done) - { - try - { - long overallTimeout = timeout - (System.nanoTime() - start); - if (overallTimeout <= 0) - { - throw new TimeoutException("Operation timed out."); - } - TimeUnit.NANOSECONDS.timedWait(this, overallTimeout); - } - catch (InterruptedException e) - { - interrupted = true; - } - } - } + throw new TimeoutException("Operation timed out."); } - finally + try { - if (interrupted) - { - Thread.currentThread().interrupt(); - } + return super.get(timeout, TimeUnit.NANOSECONDS); } - return result; - } - - public synchronized void response(MessageIn<T> response) - { - if (!done) + catch (InterruptedException | ExecutionException e) { - result = response.payload; - done = true; - this.notifyAll(); + throw new AssertionError(e); } } - public boolean isLatencyForSnitch() + @VisibleForTesting + public static <T> AsyncOneResponse<T> immediate(T value) { - return false; + AsyncOneResponse<T> response = new AsyncOneResponse<>(); + response.set(value); + return response; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java deleted file mode 100644 index e06131e..0000000 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ /dev/null @@ -1,412 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Iterables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.ReadCommand; -import org.apache.cassandra.db.SinglePartitionReadCommand; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.exceptions.ReadFailureException; -import org.apache.cassandra.exceptions.ReadTimeoutException; -import org.apache.cassandra.exceptions.UnavailableException; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.metrics.ReadRepairMetrics; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.SpeculativeRetryParam; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.service.StorageProxy.LocalReadRunnable; -import org.apache.cassandra.tracing.TraceState; -import org.apache.cassandra.tracing.Tracing; - -/** - * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel. - * - * Optionally, may perform additional requests to provide redundancy against replica failure: - * AlwaysSpeculatingReadExecutor will always send a request to one extra replica, while - * SpeculatingReadExecutor will wait until it looks like the original request is in danger - * of timing out before performing extra reads. - */ -public abstract class AbstractReadExecutor -{ - private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class); - - protected final ReadCommand command; - protected final List<InetAddressAndPort> targetReplicas; - protected final ReadCallback handler; - protected final TraceState traceState; - protected final ColumnFamilyStore cfs; - - AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime) - { - this.command = command; - this.targetReplicas = targetReplicas; - this.handler = new ReadCallback(new DigestResolver(keyspace, command, consistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplicas, queryStartNanoTime); - this.cfs = cfs; - this.traceState = Tracing.instance.get(); - - // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes - // knows how to produce older digest but the reverse is not true. - // TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once - // we stop being compatible with pre-3.0 nodes. - int digestVersion = MessagingService.current_version; - for (InetAddressAndPort replica : targetReplicas) - digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica)); - command.setDigestVersion(digestVersion); - } - - protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints) - { - makeRequests(command, endpoints); - - } - - protected void makeDigestRequests(Iterable<InetAddressAndPort> endpoints) - { - makeRequests(command.copyAsDigestQuery(), endpoints); - } - - private void makeRequests(ReadCommand readCommand, Iterable<InetAddressAndPort> endpoints) - { - boolean hasLocalEndpoint = false; - - for (InetAddressAndPort endpoint : endpoints) - { - if (StorageProxy.canDoLocalRequest(endpoint)) - { - hasLocalEndpoint = true; - continue; - } - - if (traceState != null) - traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); - logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); - MessageOut<ReadCommand> message = readCommand.createMessage(); - MessagingService.instance().sendRRWithFailure(message, endpoint, handler); - } - - // We delay the local (potentially blocking) read till the end to avoid stalling remote requests. - if (hasLocalEndpoint) - { - logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data"); - StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler)); - } - } - - /** - * Perform additional requests if it looks like the original will time out. May block while it waits - * to see if the original requests are answered first. - */ - public abstract void maybeTryAdditionalReplicas(); - - /** - * Get the replicas involved in the [finished] request. - * - * @return target replicas + the extra replica, *IF* we speculated. - */ - public abstract Collection<InetAddressAndPort> getContactedReplicas(); - - /** - * send the initial set of requests - */ - public abstract void executeAsync(); - - /** - * wait for an answer. Blocks until success or timeout, so it is caller's - * responsibility to call maybeTryAdditionalReplicas first. - */ - public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException - { - try - { - return handler.get(); - } - catch (ReadTimeoutException e) - { - try - { - onReadTimeout(); - } - finally - { - throw e; - } - } - } - - private static ReadRepairDecision newReadRepairDecision(TableMetadata metadata) - { - if (metadata.params.readRepairChance > 0d || - metadata.params.dcLocalReadRepairChance > 0) - { - double chance = ThreadLocalRandom.current().nextDouble(); - if (metadata.params.readRepairChance > chance) - return ReadRepairDecision.GLOBAL; - - if (metadata.params.dcLocalReadRepairChance > chance) - return ReadRepairDecision.DC_LOCAL; - } - - return ReadRepairDecision.NONE; - } - - /** - * @return an executor appropriate for the configured speculative read policy - */ - public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException - { - Keyspace keyspace = Keyspace.open(command.metadata().keyspace); - List<InetAddressAndPort> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey()); - // 11980: Excluding EACH_QUORUM reads from potential RR, so that we do not miscount DC responses - ReadRepairDecision repairDecision = consistencyLevel == ConsistencyLevel.EACH_QUORUM - ? ReadRepairDecision.NONE - : newReadRepairDecision(command.metadata()); - List<InetAddressAndPort> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision); - - // Throw UAE early if we don't have enough replicas. - consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas); - - if (repairDecision != ReadRepairDecision.NONE) - { - Tracing.trace("Read-repair {}", repairDecision); - ReadRepairMetrics.attempted.mark(); - } - - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id); - SpeculativeRetryParam retry = cfs.metadata().params.speculativeRetry; - - // Speculative retry is disabled *OR* - // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses - if (retry.equals(SpeculativeRetryParam.NONE) - | consistencyLevel == ConsistencyLevel.EACH_QUORUM) - return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime, false); - - // There are simply no extra replicas to speculate. - // Handle this separately so it can log failed attempts to speculate due to lack of replicas - if (consistencyLevel.blockFor(keyspace) == allReplicas.size()) - return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime, true); - - if (targetReplicas.size() == allReplicas.size()) - { - // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC. - // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy - // (same amount of requests in total, but we turn 1 digest request into a full blown data request). - return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); - } - - // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs. - InetAddressAndPort extraReplica = allReplicas.get(targetReplicas.size()); - // With repair decision DC_LOCAL all replicas/target replicas may be in different order, so - // we might have to find a replacement that's not already in targetReplicas. - if (repairDecision == ReadRepairDecision.DC_LOCAL && targetReplicas.contains(extraReplica)) - { - for (InetAddressAndPort address : allReplicas) - { - if (!targetReplicas.contains(address)) - { - extraReplica = address; - break; - } - } - } - targetReplicas.add(extraReplica); - - if (retry.equals(SpeculativeRetryParam.ALWAYS)) - return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); - else // PERCENTILE or CUSTOM. - return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); - } - - /** - * Returns true if speculation should occur and if it should then block until it is time to - * send the speculative reads - */ - boolean shouldSpeculateAndMaybeWait() - { - // no latency information, or we're overloaded - if (cfs.sampleLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout())) - return false; - - return !handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS); - } - - void onReadTimeout() {} - - public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor - { - /** - * If never speculating due to lack of replicas - * log it is as a failure if it should have happened - * but couldn't due to lack of replicas - */ - private final boolean logFailedSpeculation; - - public NeverSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime, boolean logFailedSpeculation) - { - super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); - this.logFailedSpeculation = logFailedSpeculation; - } - - public void executeAsync() - { - makeDataRequests(targetReplicas.subList(0, 1)); - if (targetReplicas.size() > 1) - makeDigestRequests(targetReplicas.subList(1, targetReplicas.size())); - } - - public void maybeTryAdditionalReplicas() - { - if (shouldSpeculateAndMaybeWait() && logFailedSpeculation) - { - cfs.metric.speculativeInsufficientReplicas.inc(); - } - } - - public Collection<InetAddressAndPort> getContactedReplicas() - { - return targetReplicas; - } - } - - static class SpeculatingReadExecutor extends AbstractReadExecutor - { - private volatile boolean speculated = false; - - public SpeculatingReadExecutor(Keyspace keyspace, - ColumnFamilyStore cfs, - ReadCommand command, - ConsistencyLevel consistencyLevel, - List<InetAddressAndPort> targetReplicas, - long queryStartNanoTime) - { - super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); - } - - public void executeAsync() - { - // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating. So we know - // that the last replica in our list is "extra." - List<InetAddressAndPort> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1); - - if (handler.blockfor < initialReplicas.size()) - { - // We're hitting additional targets for read repair. Since our "extra" replica is the least- - // preferred by the snitch, we do an extra data read to start with against a replica more - // likely to reply; better to let RR fail than the entire query. - makeDataRequests(initialReplicas.subList(0, 2)); - if (initialReplicas.size() > 2) - makeDigestRequests(initialReplicas.subList(2, initialReplicas.size())); - } - else - { - // not doing read repair; all replies are important, so it doesn't matter which nodes we - // perform data reads against vs digest. - makeDataRequests(initialReplicas.subList(0, 1)); - if (initialReplicas.size() > 1) - makeDigestRequests(initialReplicas.subList(1, initialReplicas.size())); - } - } - - public void maybeTryAdditionalReplicas() - { - if (shouldSpeculateAndMaybeWait()) - { - //Handle speculation stats first in case the callback fires immediately - speculated = true; - cfs.metric.speculativeRetries.inc(); - // Could be waiting on the data, or on enough digests. - ReadCommand retryCommand = command; - if (handler.resolver.isDataPresent()) - retryCommand = command.copyAsDigestQuery(); - - InetAddressAndPort extraReplica = Iterables.getLast(targetReplicas); - if (traceState != null) - traceState.trace("speculating read retry on {}", extraReplica); - logger.trace("speculating read retry on {}", extraReplica); - MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler); - } - } - - public Collection<InetAddressAndPort> getContactedReplicas() - { - return speculated - ? targetReplicas - : targetReplicas.subList(0, targetReplicas.size() - 1); - } - - @Override - void onReadTimeout() - { - //Shouldn't be possible to get here without first attempting to speculate even if the - //timing is bad - assert speculated; - cfs.metric.speculativeFailedRetries.inc(); - } - } - - private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor - { - public AlwaysSpeculatingReadExecutor(Keyspace keyspace, - ColumnFamilyStore cfs, - ReadCommand command, - ConsistencyLevel consistencyLevel, - List<InetAddressAndPort> targetReplicas, - long queryStartNanoTime) - { - super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); - } - - public void maybeTryAdditionalReplicas() - { - // no-op - } - - public Collection<InetAddressAndPort> getContactedReplicas() - { - return targetReplicas; - } - - @Override - public void executeAsync() - { - makeDataRequests(targetReplicas.subList(0, targetReplicas.size() > 1 ? 2 : 1)); - if (targetReplicas.size() > 2) - makeDigestRequests(targetReplicas.subList(2, targetReplicas.size())); - cfs.metric.speculativeRetries.inc(); - } - - @Override - void onReadTimeout() - { - cfs.metric.speculativeFailedRetries.inc(); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/AsyncRepairCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java deleted file mode 100644 index d613f3d..0000000 --- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.db.ReadResponse; -import org.apache.cassandra.net.IAsyncCallback; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.utils.WrappedRunnable; - -public class AsyncRepairCallback implements IAsyncCallback<ReadResponse> -{ - private final DataResolver repairResolver; - private final int blockfor; - protected final AtomicInteger received = new AtomicInteger(0); - - public AsyncRepairCallback(DataResolver repairResolver, int blockfor) - { - this.repairResolver = repairResolver; - this.blockfor = blockfor; - } - - public void response(MessageIn<ReadResponse> message) - { - repairResolver.preprocess(message); - if (received.incrementAndGet() == blockfor) - { - StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable() - { - protected void runMayThrow() - { - repairResolver.compareResponses(); - } - }); - } - } - - public boolean isLatencyForSnitch() - { - return true; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java deleted file mode 100644 index 82db754..0000000 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ /dev/null @@ -1,838 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.util.*; -import java.util.concurrent.TimeoutException; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.collect.Iterables; - -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.schema.ColumnMetadata; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.filter.DataLimits.Counter; -import org.apache.cassandra.db.partitions.*; -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.transform.*; -import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.dht.ExcludingBounds; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.exceptions.ReadTimeoutException; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.*; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.FBUtilities; - -public class DataResolver extends ResponseResolver -{ - private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS = - Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations"); - - @VisibleForTesting - final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>()); - private final long queryStartNanoTime; - private final boolean enforceStrictLiveness; - - DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime) - { - super(keyspace, command, consistency, maxResponseCount); - this.queryStartNanoTime = queryStartNanoTime; - this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); - } - - public PartitionIterator getData() - { - ReadResponse response = responses.iterator().next().payload; - return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec()); - } - - public boolean isDataPresent() - { - return !responses.isEmpty(); - } - - public void compareResponses() - { - // We need to fully consume the results to trigger read repairs if appropriate - try (PartitionIterator iterator = resolve()) - { - PartitionIterators.consume(iterator); - } - } - - public PartitionIterator resolve() - { - // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here - // at the beginning of this method), so grab the response count once and use that through the method. - int count = responses.size(); - List<UnfilteredPartitionIterator> iters = new ArrayList<>(count); - InetAddressAndPort[] sources = new InetAddressAndPort[count]; - for (int i = 0; i < count; i++) - { - MessageIn<ReadResponse> msg = responses.get(i); - iters.add(msg.payload.makeIterator(command)); - sources[i] = msg.from; - } - - /* - * Even though every response, individually, will honor the limit, it is possible that we will, after the merge, - * have more rows than the client requested. To make sure that we still conform to the original limit, - * we apply a top-level post-reconciliation counter to the merged partition iterator. - * - * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied - * to the current partition to work. For this reason we have to apply the counter transformation before - * empty partition discard logic kicks in - for it will eagerly consume the iterator. - * - * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions - * - * See CASSANDRA-13747 for more details. - */ - - DataLimits.Counter mergedResultCounter = - command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness); - - UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter); - FilteredPartitions filtered = - FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness())); - PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter); - return Transformation.apply(counted, new EmptyPartitionsDiscarder()); - } - - private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, - InetAddressAndPort[] sources, - DataLimits.Counter mergedResultCounter) - { - // If we have only one results, there is no read repair to do and we can't get short reads - if (results.size() == 1) - return results.get(0); - - /* - * So-called short reads stems from nodes returning only a subset of the results they have due to the limit, - * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother. - */ - if (!command.limits().isUnlimited()) - for (int i = 0; i < results.size(); i++) - results.set(i, extendWithShortReadProtection(results.get(i), sources[i], mergedResultCounter)); - - return UnfilteredPartitionIterators.merge(results, command.nowInSec(), new RepairMergeListener(sources)); - } - - private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener - { - private final InetAddressAndPort[] sources; - - private RepairMergeListener(InetAddressAndPort[] sources) - { - this.sources = sources; - } - - public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) - { - return new MergeListener(partitionKey, columns(versions), isReversed(versions)); - } - - private RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions) - { - Columns statics = Columns.NONE; - Columns regulars = Columns.NONE; - for (UnfilteredRowIterator iter : versions) - { - if (iter == null) - continue; - - RegularAndStaticColumns cols = iter.columns(); - statics = statics.mergeTo(cols.statics); - regulars = regulars.mergeTo(cols.regulars); - } - return new RegularAndStaticColumns(statics, regulars); - } - - private boolean isReversed(List<UnfilteredRowIterator> versions) - { - for (UnfilteredRowIterator iter : versions) - { - if (iter == null) - continue; - - // Everything will be in the same order - return iter.isReverseOrder(); - } - - assert false : "Expected at least one iterator"; - return false; - } - - public void close() - { - try - { - FBUtilities.waitOnFutures(repairResults, DatabaseDescriptor.getWriteRpcTimeout()); - } - catch (TimeoutException ex) - { - // We got all responses, but timed out while repairing - int blockFor = consistency.blockFor(keyspace); - if (Tracing.isTracing()) - Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); - else - logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor); - - throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true); - } - } - - private class MergeListener implements UnfilteredRowIterators.MergeListener - { - private final DecoratedKey partitionKey; - private final RegularAndStaticColumns columns; - private final boolean isReversed; - private final PartitionUpdate.Builder[] repairs = new PartitionUpdate.Builder[sources.length]; - - private final Row.Builder[] currentRows = new Row.Builder[sources.length]; - private final RowDiffListener diffListener; - - // The partition level deletion for the merge row. - private DeletionTime partitionLevelDeletion; - // When merged has a currently open marker, its time. null otherwise. - private DeletionTime mergedDeletionTime; - // For each source, the time of the current deletion as known by the source. - private final DeletionTime[] sourceDeletionTime = new DeletionTime[sources.length]; - // For each source, record if there is an open range to send as repair, and from where. - private final ClusteringBound[] markerToRepair = new ClusteringBound[sources.length]; - - private MergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed) - { - this.partitionKey = partitionKey; - this.columns = columns; - this.isReversed = isReversed; - - this.diffListener = new RowDiffListener() - { - public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) - { - if (merged != null && !merged.equals(original)) - currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged); - } - - public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original) - { - if (merged != null && !merged.equals(original)) - currentRow(i, clustering).addRowDeletion(merged); - } - - public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original) - { - if (merged != null && !merged.equals(original)) - currentRow(i, clustering).addComplexDeletion(column, merged); - } - - public void onCell(int i, Clustering clustering, Cell merged, Cell original) - { - if (merged != null && !merged.equals(original) && isQueried(merged)) - currentRow(i, clustering).addCell(merged); - } - - private boolean isQueried(Cell cell) - { - // When we read, we may have some cell that have been fetched but are not selected by the user. Those cells may - // have empty values as optimization (see CASSANDRA-10655) and hence they should not be included in the read-repair. - // This is fine since those columns are not actually requested by the user and are only present for the sake of CQL - // semantic (making sure we can always distinguish between a row that doesn't exist from one that do exist but has - /// no value for the column requested by the user) and so it won't be unexpected by the user that those columns are - // not repaired. - ColumnMetadata column = cell.column(); - ColumnFilter filter = command.columnFilter(); - return column.isComplex() ? filter.fetchedCellIsQueried(column, cell.path()) : filter.fetchedColumnIsQueried(column); - } - }; - } - - private PartitionUpdate.Builder update(int i) - { - if (repairs[i] == null) - repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1); - return repairs[i]; - } - - /** - * The partition level deletion with with which source {@code i} is currently repaired, or - * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was - * up to date on it). The output* of this method is only valid after the call to - * {@link #onMergedPartitionLevelDeletion}. - */ - private DeletionTime partitionLevelRepairDeletion(int i) - { - return repairs[i] == null ? DeletionTime.LIVE : repairs[i].partitionLevelDeletion(); - } - - private Row.Builder currentRow(int i, Clustering clustering) - { - if (currentRows[i] == null) - { - currentRows[i] = BTreeRow.sortedBuilder(); - currentRows[i].newRow(clustering); - } - return currentRows[i]; - } - - public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) - { - this.partitionLevelDeletion = mergedDeletion; - for (int i = 0; i < versions.length; i++) - { - if (mergedDeletion.supersedes(versions[i])) - update(i).addPartitionDeletion(mergedDeletion); - } - } - - public void onMergedRows(Row merged, Row[] versions) - { - // If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle - // those case directly in their respective methods (in other words, it would be inefficient to send a row - // deletion as repair when we know we've already send a partition level or range tombstone that covers it). - if (merged.isEmpty()) - return; - - Rows.diff(diffListener, merged, versions); - for (int i = 0; i < currentRows.length; i++) - { - if (currentRows[i] != null) - update(i).add(currentRows[i].build()); - } - Arrays.fill(currentRows, null); - } - - private DeletionTime currentDeletion() - { - return mergedDeletionTime == null ? partitionLevelDeletion : mergedDeletionTime; - } - - public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) - { - try - { - // The code for merging range tombstones is a tad complex and we had the assertions there triggered - // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights - // when that happen without more context that what the assertion errors give us however, hence the - // catch here that basically gather as much as context as reasonable. - internalOnMergedRangeTombstoneMarkers(merged, versions); - } - catch (AssertionError e) - { - // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd - // rather get more info to debug than not. - TableMetadata table = command.metadata(); - String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, responses:%n %s", - table, - merged == null ? "null" : merged.toString(table), - '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', - Arrays.toString(sources), - makeResponsesDebugString()); - throw new AssertionError(details, e); - } - } - - private String makeResponsesDebugString() - { - return Joiner.on(",\n") - .join(Iterables.transform(getMessages(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey))); - } - - private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) - { - // The current deletion as of dealing with this marker. - DeletionTime currentDeletion = currentDeletion(); - - for (int i = 0; i < versions.length; i++) - { - RangeTombstoneMarker marker = versions[i]; - - // Update what the source now thinks is the current deletion - if (marker != null) - sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null; - - // If merged == null, some of the source is opening or closing a marker - if (merged == null) - { - // but if it's not this source, move to the next one - if (marker == null) - continue; - - // We have a close and/or open marker for a source, with nothing corresponding in merged. - // Because merged is a superset, this imply that we have a current deletion (being it due to an - // early opening in merged or a partition level deletion) and that this deletion will still be - // active after that point. Further whatever deletion was open or is open by this marker on the - // source, that deletion cannot supersedes the current one. - // - // But while the marker deletion (before and/or after this point) cannot supersede the current - // deletion, we want to know if it's equal to it (both before and after), because in that case - // the source is up to date and we don't want to include repair. - // - // So in practice we have 2 possible case: - // 1) the source was up-to-date on deletion up to that point: then it won't be from that point - // on unless it's a boundary and the new opened deletion time is also equal to the current - // deletion (note that this implies the boundary has the same closing and opening deletion - // time, which should generally not happen, but can due to legacy reading code not avoiding - // this for a while, see CASSANDRA-13237). - // 2) the source wasn't up-to-date on deletion up to that point and it may now be (if it isn't - // we just have nothing to do for that marker). - assert !currentDeletion.isLive() : currentDeletion.toString(); - - // Is the source up to date on deletion? It's up to date if it doesn't have an open RT repair - // nor an "active" partition level deletion (where "active" means that it's greater or equal - // to the current deletion: if the source has a repaired partition deletion lower than the - // current deletion, this means the current deletion is due to a previously open range tombstone, - // and if the source isn't currently repaired for that RT, then it means it's up to date on it). - DeletionTime partitionRepairDeletion = partitionLevelRepairDeletion(i); - if (markerToRepair[i] == null && currentDeletion.supersedes(partitionRepairDeletion)) - { - // Since there is an ongoing merged deletion, the only way we don't have an open repair for - // this source is that it had a range open with the same deletion as current and it's - // closing it. - assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed)) - : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata())); - - // and so unless it's a boundary whose opening deletion time is still equal to the current - // deletion (see comment above for why this can actually happen), we have to repair the source - // from that point on. - if (!(marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))) - markerToRepair[i] = marker.closeBound(isReversed).invert(); - } - // In case 2) above, we only have something to do if the source is up-to-date after that point - // (which, since the source isn't up-to-date before that point, means we're opening a new deletion - // that is equal to the current one). - else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))) - { - closeOpenMarker(i, marker.openBound(isReversed).invert()); - } - } - else - { - // We have a change of current deletion in merged (potentially to/from no deletion at all). - - if (merged.isClose(isReversed)) - { - // We're closing the merged range. If we're recorded that this should be repaird for the - // source, close and add said range to the repair to send. - if (markerToRepair[i] != null) - closeOpenMarker(i, merged.closeBound(isReversed)); - - } - - if (merged.isOpen(isReversed)) - { - // If we're opening a new merged range (or just switching deletion), then unless the source - // is up to date on that deletion (note that we've updated what the source deleteion is - // above), we'll have to sent the range to the source. - DeletionTime newDeletion = merged.openDeletionTime(isReversed); - DeletionTime sourceDeletion = sourceDeletionTime[i]; - if (!newDeletion.equals(sourceDeletion)) - markerToRepair[i] = merged.openBound(isReversed); - } - } - } - - if (merged != null) - mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null; - } - - private void closeOpenMarker(int i, ClusteringBound close) - { - ClusteringBound open = markerToRepair[i]; - update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion())); - markerToRepair[i] = null; - } - - public void close() - { - for (int i = 0; i < repairs.length; i++) - if (null != repairs[i]) - sendRepairMutation(repairs[i].build(), sources[i]); - } - - private void sendRepairMutation(PartitionUpdate partition, InetAddressAndPort destination) - { - Mutation mutation = new Mutation(partition); - int messagingVersion = MessagingService.instance().getVersion(destination); - - int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion); - int maxMutationSize = DatabaseDescriptor.getMaxMutationSize(); - - if (mutationSize <= maxMutationSize) - { - Tracing.trace("Sending read-repair-mutation to {}", destination); - // use a separate verb here to avoid writing hints on timeouts - MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR); - repairResults.add(MessagingService.instance().sendRR(message, destination)); - ColumnFamilyStore.metricsFor(command.metadata().id).readRepairRequests.mark(); - } - else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS) - { - logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", - mutationSize, - maxMutationSize, - command.metadata(), - command.metadata().partitionKeyType.getString(partitionKey.getKey()), - destination); - } - else - { - logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", - mutationSize, - maxMutationSize, - command.metadata(), - command.metadata().partitionKeyType.getString(partitionKey.getKey()), - destination); - - int blockFor = consistency.blockFor(keyspace); - Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); - throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); - } - } - } - } - - private UnfilteredPartitionIterator extendWithShortReadProtection(UnfilteredPartitionIterator partitions, - InetAddressAndPort source, - DataLimits.Counter mergedResultCounter) - { - DataLimits.Counter singleResultCounter = - command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(), enforceStrictLiveness).onlyCount(); - - ShortReadPartitionsProtection protection = - new ShortReadPartitionsProtection(source, singleResultCounter, mergedResultCounter, queryStartNanoTime); - - /* - * The order of extention and transformations is important here. Extending with more partitions has to happen - * first due to the way BaseIterator.hasMoreContents() works: only transformations applied after extension will - * be called on the first partition of the extended iterator. - * - * Additionally, we want singleResultCounter to be applied after SRPP, so that its applyToPartition() method will - * be called last, after the extension done by SRRP.applyToPartition() call. That way we preserve the same order - * when it comes to calling SRRP.moreContents() and applyToRow() callbacks. - * - * See ShortReadPartitionsProtection.applyToPartition() for more details. - */ - - // extend with moreContents() only if it's a range read command with no partition key specified - if (!command.isLimitedToOnePartition()) - partitions = MorePartitions.extend(partitions, protection); // register SRPP.moreContents() - - partitions = Transformation.apply(partitions, protection); // register SRPP.applyToPartition() - partitions = Transformation.apply(partitions, singleResultCounter); // register the per-source counter - - return partitions; - } - - /* - * We have a potential short read if the result from a given node contains the requested number of rows - * (i.e. it has stopped returning results due to the limit), but some of them haven't - * made it into the final post-reconciliation result due to other nodes' row, range, and/or partition tombstones. - * - * If that is the case, then that node may have more rows that we should fetch, as otherwise we could - * ultimately return fewer rows than required. Also, those additional rows may contain tombstones which - * which we also need to fetch as they may shadow rows or partitions from other replicas' results, which we would - * otherwise return incorrectly. - */ - private class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator> - { - private final InetAddressAndPort source; - - private final DataLimits.Counter singleResultCounter; // unmerged per-source counter - private final DataLimits.Counter mergedResultCounter; // merged end-result counter - - private DecoratedKey lastPartitionKey; // key of the last observed partition - - private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call - - private final long queryStartNanoTime; - - private ShortReadPartitionsProtection(InetAddressAndPort source, - DataLimits.Counter singleResultCounter, - DataLimits.Counter mergedResultCounter, - long queryStartNanoTime) - { - this.source = source; - this.singleResultCounter = singleResultCounter; - this.mergedResultCounter = mergedResultCounter; - this.queryStartNanoTime = queryStartNanoTime; - } - - @Override - public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) - { - partitionsFetched = true; - - lastPartitionKey = partition.partitionKey(); - - /* - * Extend for moreContents() then apply protection to track lastClustering by applyToRow(). - * - * If we don't apply the transformation *after* extending the partition with MoreRows, - * applyToRow() method of protection will not be called on the first row of the new extension iterator. - */ - ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.metadata(), partition.partitionKey()); - return Transformation.apply(MoreRows.extend(partition, protection), protection); - } - - /* - * We only get here once all the rows and partitions in this iterator have been iterated over, and so - * if the node had returned the requested number of rows but we still get here, then some results were - * skipped during reconciliation. - */ - public UnfilteredPartitionIterator moreContents() - { - // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit - assert !mergedResultCounter.isDone(); - - // we do not apply short read protection when we have no limits at all - assert !command.limits().isUnlimited(); - - /* - * If this is a single partition read command or an (indexed) partition range read command with - * a partition key specified, then we can't and shouldn't try fetch more partitions. - */ - assert !command.isLimitedToOnePartition(); - - /* - * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more. - * - * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false - * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911). - */ - if (!singleResultCounter.isDone() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) - return null; - - /* - * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator. - * There is no point to ask the replica for more rows - it has no more in the requested range. - */ - if (!partitionsFetched) - return null; - partitionsFetched = false; - - /* - * We are going to fetch one partition at a time for thrift and potentially more for CQL. - * The row limit will either be set to the per partition limit - if the command has no total row limit set, or - * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions, - * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones. - */ - int toQuery = command.limits().count() != DataLimits.NO_LIMIT - ? command.limits().count() - counted(mergedResultCounter) - : command.limits().perPartitionCount(); - - ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark(); - Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source); - - PartitionRangeReadCommand cmd = makeFetchAdditionalPartitionReadCommand(toQuery); - return executeReadCommand(cmd); - } - - // Counts the number of rows for regular queries and the number of groups for GROUP BY queries - private int counted(Counter counter) - { - return command.limits().isGroupByLimit() - ? counter.rowCounted() - : counter.counted(); - } - - private PartitionRangeReadCommand makeFetchAdditionalPartitionReadCommand(int toQuery) - { - PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command; - - DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery); - - AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange(); - AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight() - ? new Range<>(lastPartitionKey, bounds.right) - : new ExcludingBounds<>(lastPartitionKey, bounds.right); - DataRange newDataRange = cmd.dataRange().forSubRange(newBounds); - - return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange); - } - - private class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator> - { - private final TableMetadata metadata; - private final DecoratedKey partitionKey; - - private Clustering lastClustering; // clustering of the last observed row - - private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows - private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command) - private int lastQueried = 0; // # extra rows requested from the replica last time - - private ShortReadRowsProtection(TableMetadata metadata, DecoratedKey partitionKey) - { - this.metadata = metadata; - this.partitionKey = partitionKey; - } - - @Override - public Row applyToRow(Row row) - { - lastClustering = row.clustering(); - return row; - } - - /* - * We only get here once all the rows in this iterator have been iterated over, and so if the node - * had returned the requested number of rows but we still get here, then some results were skipped - * during reconciliation. - */ - public UnfilteredRowIterator moreContents() - { - // never try to request additional rows from replicas if our reconciled partition is already filled to the limit - assert !mergedResultCounter.isDoneForPartition(); - - // we do not apply short read protection when we have no limits at all - assert !command.limits().isUnlimited(); - - /* - * If the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more. - * - * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false - * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911). - */ - if (!singleResultCounter.isDoneForPartition() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) - return null; - - /* - * If the replica has no live rows in the partition, don't try to fetch more. - * - * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't - * always cover this scenario: - * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit], - * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition. - * - * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch - * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only - * have tombstones in the current partition. - * - * One other way we can hit this condition is when the partition only has a live static row and no regular - * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after - * the moreContents() call. - */ - if (countedInCurrentPartition(singleResultCounter) == 0) - return null; - - /* - * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering. - * We already have the row, so there is no point in asking for more from the partition. - */ - if (Clustering.EMPTY == lastClustering) - return null; - - lastFetched = countedInCurrentPartition(singleResultCounter) - lastCounted; - lastCounted = countedInCurrentPartition(singleResultCounter); - - // getting back fewer rows than we asked for means the partition on the replica has been fully consumed - if (lastQueried > 0 && lastFetched < lastQueried) - return null; - - /* - * At this point we know that: - * 1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more - * rows in the partition - * 2. at least one of those returned rows was shadowed by a tombstone returned from another - * replica - * 3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to - * avoid a short read - * - * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b - * are defined as follows: - * [a] limits.count() - mergedResultCounter.counted() - * [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition() - * - * It would be naive to query for exactly that many rows, as it's possible and not unlikely - * that some of the returned rows would also be shadowed by tombstones from other hosts. - * - * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result; - * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it. - * - * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number - * of rows fetched: there is a high transactional cost for every individual request, but a relatively low - * marginal cost for each extra row requested. - * - * As such it's better to overfetch than to underfetch extra rows from a host; but at the same - * time we want to respect paging limits and not blow up spectacularly. - * - * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only - * counts. - * - * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits. - * - * See CASSANDRA-13794 for more details. - */ - lastQueried = Math.min(command.limits().count(), command.limits().perPartitionCount()); - - ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark(); - Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source); - - SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried); - return UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd), cmd); - } - - // Counts the number of rows for regular queries and the number of groups for GROUP BY queries - private int countedInCurrentPartition(Counter counter) - { - return command.limits().isGroupByLimit() - ? counter.rowCountedInCurrentPartition() - : counter.countedInCurrentPartition(); - } - - private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery) - { - ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); - if (null != lastClustering) - filter = filter.forPaging(metadata.comparator, lastClustering, false); - - return SinglePartitionReadCommand.create(command.metadata(), - command.nowInSec(), - command.columnFilter(), - command.rowFilter(), - command.limits().forShortReadRetry(toQuery), - partitionKey, - filter, - command.indexMetadata()); - } - } - - private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd) - { - DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1, queryStartNanoTime); - ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source), queryStartNanoTime); - - if (StorageProxy.canDoLocalRequest(source)) - StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); - else - MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source, handler); - - // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. - handler.awaitResults(); - assert resolver.responses.size() == 1; - return resolver.responses.get(0).payload.makeIterator(command); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/DigestMismatchException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DigestMismatchException.java b/src/java/org/apache/cassandra/service/DigestMismatchException.java deleted file mode 100644 index 18d5939..0000000 --- a/src/java/org/apache/cassandra/service/DigestMismatchException.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.nio.ByteBuffer; - -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.utils.ByteBufferUtil; - -@SuppressWarnings("serial") -public class DigestMismatchException extends Exception -{ - public DigestMismatchException(DecoratedKey key, ByteBuffer digest1, ByteBuffer digest2) - { - super(String.format("Mismatch for key %s (%s vs %s)", - key.toString(), - ByteBufferUtil.bytesToHex(digest1), - ByteBufferUtil.bytesToHex(digest2))); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/39807ba4/src/java/org/apache/cassandra/service/DigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java deleted file mode 100644 index 6a528e9..0000000 --- a/src/java/org/apache/cassandra/service/DigestResolver.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; -import org.apache.cassandra.net.MessageIn; - -public class DigestResolver extends ResponseResolver -{ - private volatile ReadResponse dataResponse; - - public DigestResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) - { - super(keyspace, command, consistency, maxResponseCount); - } - - @Override - public void preprocess(MessageIn<ReadResponse> message) - { - super.preprocess(message); - if (dataResponse == null && !message.payload.isDigestResponse()) - dataResponse = message.payload; - } - - /** - * Special case of resolve() so that CL.ONE reads never throw DigestMismatchException in the foreground - */ - public PartitionIterator getData() - { - assert isDataPresent(); - return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec()); - } - - /* - * This method handles two different scenarios: - * - * a) we're handling the initial read of data from the closest replica + digests - * from the rest. In this case we check the digests against each other, - * throw an exception if there is a mismatch, otherwise return the data row. - * - * b) we're checking additional digests that arrived after the minimum to handle - * the requested ConsistencyLevel, i.e. asynchronous read repair check - */ - public PartitionIterator resolve() throws DigestMismatchException - { - if (responses.size() == 1) - return getData(); - - if (logger.isTraceEnabled()) - logger.trace("resolving {} responses", responses.size()); - - compareResponses(); - - return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec()); - } - - public void compareResponses() throws DigestMismatchException - { - long start = System.nanoTime(); - - // validate digests against each other; throw immediately on mismatch. - ByteBuffer digest = null; - for (MessageIn<ReadResponse> message : responses) - { - ReadResponse response = message.payload; - - ByteBuffer newDigest = response.digest(command); - if (digest == null) - digest = newDigest; - else if (!digest.equals(newDigest)) - // rely on the fact that only single partition queries use digests - throw new DigestMismatchException(((SinglePartitionReadCommand)command).partitionKey(), digest, newDigest); - } - - if (logger.isTraceEnabled()) - logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - } - - public boolean isDataPresent() - { - return dataResponse != null; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org