Repository: cassandra
Updated Branches:
  refs/heads/trunk 42c92b976 -> 467068d1e


Transient node receives full data requests

patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-14762


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/467068d1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/467068d1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/467068d1

Branch: refs/heads/trunk
Commit: 467068d1e9d84e6cca1f9dd5a4eff5f80d027c2e
Parents: 42c92b9
Author: Benedict Elliott Smith <bened...@apple.com>
Authored: Thu Sep 20 18:56:38 2018 +0100
Committer: Benedict Elliott Smith <bened...@apple.com>
Committed: Wed Oct 3 14:27:21 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../service/reads/AbstractReadExecutor.java     |  3 ++-
 .../reads/ShortReadPartitionsProtection.java    |  6 +++++
 .../reads/repair/AbstractReadRepair.java        | 27 +++++++++++++++-----
 .../reads/repair/BlockingReadRepairTest.java    |  2 +-
 .../DiagEventsBlockingReadRepairTest.java       |  2 +-
 .../reads/repair/ReadOnlyReadRepairTest.java    |  2 +-
 7 files changed, 32 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e89c1c5..ae321e5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Transient node receives full data requests (CASSANDRA-14762)
  * Enable snapshot artifacts publish (CASSANDRA-12704)
  * Introduce RangesAtEndpoint.unwrap to simplify 
StreamSession.addTransferRanges (CASSANDRA-14770)
  * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead 
of Unavailable (CASSANDRA-14735)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 8d0f14c..8f7bc26 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -274,7 +274,7 @@ public abstract class AbstractReadExecutor
                 speculated = true;
 
                 ReplicaPlan.ForTokenRead replicaPlan = replicaPlan();
-                ReadCommand retryCommand = command;
+                ReadCommand retryCommand;
                 Replica extraReplica;
                 if (handler.resolver.isDataPresent())
                 {
@@ -290,6 +290,7 @@ public abstract class AbstractReadExecutor
                 else
                 {
                     extraReplica = 
replicaPlan.firstUncontactedCandidate(Replica::isFull);
+                    retryCommand = command;
                     if (extraReplica == null)
                     {
                         cfs.metric.speculativeInsufficientReplicas.inc();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
 
b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index 2e4440f..7b7c4d3 100644
--- 
a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ 
b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@ -182,9 +182,15 @@ public class ShortReadPartitionsProtection extends 
Transformation<UnfilteredRowI
         ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, 
replicaPlan, queryStartNanoTime);
 
         if (source.isSelf())
+        {
             StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));
+        }
         else
+        {
+            if (source.isTransient())
+                cmd = cmd.copyAsTransientQuery(source);
             MessagingService.instance().sendRRWithFailure(cmd.createMessage(), 
source.endpoint(), handler);
+        }
 
         // We don't call handler.get() because we want to preserve tombstones 
since we're still in the middle of merging node results.
         handler.awaitResults();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 1b213ff..b74f8d3 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -84,8 +84,24 @@ public abstract class AbstractReadRepair<E extends 
Endpoints<E>, P extends Repli
         return replicaPlan.get();
     }
 
-    void sendReadCommand(Replica to, ReadCallback readCallback)
+    void sendReadCommand(Replica to, ReadCallback readCallback, boolean 
speculative)
     {
+        ReadCommand command = this.command;
+        if (to.isTransient())
+        {
+            // It's OK to send queries to transient nodes during RR, as we may 
have contacted them for their data request initially
+            // So long as we don't use these to generate repair mutations, 
we're fine, and this is enforced by requiring
+            // ReadOnlyReadRepair for transient keyspaces.
+            command = command.copyAsTransientQuery(to);
+        }
+
+        if (Tracing.isTracing())
+        {
+            String type;
+            if (speculative) type = to.isFull() ? "speculative full" : 
"speculative transient";
+            else type = to.isFull() ? "full" : "transient";
+            Tracing.trace("Enqueuing {} data read to {}", type, to);
+        }
         MessageOut<ReadCommand> message = command.createMessage();
         // if enabled, request additional info about repaired data from any 
full replicas
         if (command.isTrackingRepairedStatus() && to.isFull())
@@ -112,10 +128,8 @@ public abstract class AbstractReadRepair<E extends 
Endpoints<E>, P extends Repli
             command.trackRepairedStatus();
 
         for (Replica replica : replicaPlan().contacts())
-        {
-            Tracing.trace("Enqueuing full data read to {}", replica);
-            sendReadCommand(replica, readCallback);
-        }
+            sendReadCommand(replica, readCallback, false);
+
         ReadRepairDiagnostics.startRepair(this, replicaPlan(), digestResolver);
     }
 
@@ -153,8 +167,7 @@ public abstract class AbstractReadRepair<E extends 
Endpoints<E>, P extends Repli
                 return;
 
             replicaPlan.addToContacts(uncontacted);
-            Tracing.trace("Enqueuing speculative full data read to {}", 
uncontacted);
-            sendReadCommand(uncontacted, repair.readCallback);
+            sendReadCommand(uncontacted, repair.readCallback, true);
             ReadRepairMetrics.speculatedRead.mark();
             ReadRepairDiagnostics.speculatedRead(this, uncontacted.endpoint(), 
replicaPlan());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index 34bbf32..6ea593d 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -91,7 +91,7 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
         ReadCallback readCallback = null;
 
         @Override
-        void sendReadCommand(Replica to, ReadCallback callback)
+        void sendReadCommand(Replica to, ReadCallback callback, boolean 
speculative)
         {
             assert readCallback == null || readCallback == callback;
             readCommandRecipients.add(to.endpoint());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index 2471ffd..befa07a 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -142,7 +142,7 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
             Assert.assertNotNull(e.toMap());
         }
 
-        void sendReadCommand(Replica to, ReadCallback callback)
+        void sendReadCommand(Replica to, ReadCallback callback, boolean 
speculative)
         {
             assert readCallback == null || readCallback == callback;
             readCallback = callback;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/467068d1/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index cf12265..cc7fdf1 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -49,7 +49,7 @@ public class ReadOnlyReadRepairTest extends 
AbstractReadRepairTest
         ReadCallback readCallback = null;
 
         @Override
-        void sendReadCommand(Replica to, ReadCallback callback)
+        void sendReadCommand(Replica to, ReadCallback callback, boolean 
speculative)
         {
             assert readCallback == null || readCallback == callback;
             readCommandRecipients.add(to.endpoint());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to