Repository: cassandra Updated Branches: refs/heads/trunk 42c92b976 -> 467068d1e
Transient node receives full data requests patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-14762 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/467068d1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/467068d1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/467068d1 Branch: refs/heads/trunk Commit: 467068d1e9d84e6cca1f9dd5a4eff5f80d027c2e Parents: 42c92b9 Author: Benedict Elliott Smith <bened...@apple.com> Authored: Thu Sep 20 18:56:38 2018 +0100 Committer: Benedict Elliott Smith <bened...@apple.com> Committed: Wed Oct 3 14:27:21 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../service/reads/AbstractReadExecutor.java | 3 ++- .../reads/ShortReadPartitionsProtection.java | 6 +++++ .../reads/repair/AbstractReadRepair.java | 27 +++++++++++++++----- .../reads/repair/BlockingReadRepairTest.java | 2 +- .../DiagEventsBlockingReadRepairTest.java | 2 +- .../reads/repair/ReadOnlyReadRepairTest.java | 2 +- 7 files changed, 32 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e89c1c5..ae321e5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Transient node receives full data requests (CASSANDRA-14762) * Enable snapshot artifacts publish (CASSANDRA-12704) * Introduce RangesAtEndpoint.unwrap to simplify StreamSession.addTransferRanges (CASSANDRA-14770) * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead of Unavailable (CASSANDRA-14735) http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index 8d0f14c..8f7bc26 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -274,7 +274,7 @@ public abstract class AbstractReadExecutor speculated = true; ReplicaPlan.ForTokenRead replicaPlan = replicaPlan(); - ReadCommand retryCommand = command; + ReadCommand retryCommand; Replica extraReplica; if (handler.resolver.isDataPresent()) { @@ -290,6 +290,7 @@ public abstract class AbstractReadExecutor else { extraReplica = replicaPlan.firstUncontactedCandidate(Replica::isFull); + retryCommand = command; if (extraReplica == null) { cfs.metric.speculativeInsufficientReplicas.inc(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java index 2e4440f..7b7c4d3 100644 --- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java @@ -182,9 +182,15 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime); if (source.isSelf()) + { StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); + } else + { + if (source.isTransient()) + cmd = cmd.copyAsTransientQuery(source); MessagingService.instance().sendRRWithFailure(cmd.createMessage(), source.endpoint(), handler); + } // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. handler.awaitResults(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java index 1b213ff..b74f8d3 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java @@ -84,8 +84,24 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends Repli return replicaPlan.get(); } - void sendReadCommand(Replica to, ReadCallback readCallback) + void sendReadCommand(Replica to, ReadCallback readCallback, boolean speculative) { + ReadCommand command = this.command; + if (to.isTransient()) + { + // It's OK to send queries to transient nodes during RR, as we may have contacted them for their data request initially + // So long as we don't use these to generate repair mutations, we're fine, and this is enforced by requiring + // ReadOnlyReadRepair for transient keyspaces. + command = command.copyAsTransientQuery(to); + } + + if (Tracing.isTracing()) + { + String type; + if (speculative) type = to.isFull() ? "speculative full" : "speculative transient"; + else type = to.isFull() ? "full" : "transient"; + Tracing.trace("Enqueuing {} data read to {}", type, to); + } MessageOut<ReadCommand> message = command.createMessage(); // if enabled, request additional info about repaired data from any full replicas if (command.isTrackingRepairedStatus() && to.isFull()) @@ -112,10 +128,8 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends Repli command.trackRepairedStatus(); for (Replica replica : replicaPlan().contacts()) - { - Tracing.trace("Enqueuing full data read to {}", replica); - sendReadCommand(replica, readCallback); - } + sendReadCommand(replica, readCallback, false); + ReadRepairDiagnostics.startRepair(this, replicaPlan(), digestResolver); } @@ -153,8 +167,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends Repli return; replicaPlan.addToContacts(uncontacted); - Tracing.trace("Enqueuing speculative full data read to {}", uncontacted); - sendReadCommand(uncontacted, repair.readCallback); + sendReadCommand(uncontacted, repair.readCallback, true); ReadRepairMetrics.speculatedRead.mark(); ReadRepairDiagnostics.speculatedRead(this, uncontacted.endpoint(), replicaPlan()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java index 34bbf32..6ea593d 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java @@ -91,7 +91,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest ReadCallback readCallback = null; @Override - void sendReadCommand(Replica to, ReadCallback callback) + void sendReadCommand(Replica to, ReadCallback callback, boolean speculative) { assert readCallback == null || readCallback == callback; readCommandRecipients.add(to.endpoint()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java index 2471ffd..befa07a 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java @@ -142,7 +142,7 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest Assert.assertNotNull(e.toMap()); } - void sendReadCommand(Replica to, ReadCallback callback) + void sendReadCommand(Replica to, ReadCallback callback, boolean speculative) { assert readCallback == null || readCallback == callback; readCallback = callback; http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java index cf12265..cc7fdf1 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java @@ -49,7 +49,7 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest ReadCallback readCallback = null; @Override - void sendReadCommand(Replica to, ReadCallback callback) + void sendReadCommand(Replica to, ReadCallback callback, boolean speculative) { assert readCallback == null || readCallback == callback; readCommandRecipients.add(to.endpoint()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org