Repository: cassandra
Updated Branches:
  refs/heads/trunk 57b87d21a -> 59de35332


Add a check for receiving digest response from transient node

Patch by Alex Petrov; reviewed by Benedict Elliot Smith for CASSANDRA-14750


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59de3533
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59de3533
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59de3533

Branch: refs/heads/trunk
Commit: 59de353325768b6bb8f4dc18a1a2ace5071f8f84
Parents: 57b87d2
Author: Alex Petrov <oleksandr.pet...@gmail.com>
Authored: Wed Sep 12 23:20:34 2018 +0200
Committer: Alex Petrov <oleksandr.pet...@gmail.com>
Committed: Fri Sep 21 15:11:39 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/PartitionRangeReadCommand.java |  7 +--
 .../org/apache/cassandra/db/ReadCommand.java    | 49 +++++++++++++++++++-
 .../db/SinglePartitionReadCommand.java          | 10 +++-
 .../apache/cassandra/service/StorageProxy.java  |  2 +-
 .../service/reads/AbstractReadExecutor.java     |  8 ++--
 .../cassandra/service/reads/DigestResolver.java |  6 ---
 .../service/reads/ResponseResolver.java         |  4 ++
 .../apache/cassandra/db/ReadCommandTest.java    | 46 ++++++++++++++++++
 .../apache/cassandra/locator/ReplicaUtils.java  | 10 ++++
 10 files changed, 125 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 76b7b8b..f9d2f3c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add a check for receiving digest response from transient node 
(CASSANDRA-14750)
  * Fail query on transient replica if coordinator only expects full data 
(CASSANDRA-14704)
  * Remove mentions of transient replication from repair path (CASSANDRA-14698)
  * Fix handleRepairStatusChangedNotification to remove first then add 
(CASSANDRA-14720)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java 
b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 79db18a..7928039 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.*;
@@ -173,7 +172,8 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
                                              indexMetadata());
     }
 
-    public PartitionRangeReadCommand copyAsDigestQuery()
+    @Override
+    protected PartitionRangeReadCommand copyAsDigestQuery()
     {
         return new PartitionRangeReadCommand(true,
                                              digestVersion(),
@@ -187,7 +187,8 @@ public class PartitionRangeReadCommand extends ReadCommand 
implements PartitionR
                                              indexMetadata());
     }
 
-    public PartitionRangeReadCommand copyAsTransientQuery()
+    @Override
+    protected PartitionRangeReadCommand copyAsTransientQuery()
     {
         return new PartitionRangeReadCommand(false,
                                              0,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index e146b8a..ada4ae6 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -25,6 +25,7 @@ import java.util.function.LongPredicate;
 
 import javax.annotation.Nullable;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hasher;
@@ -49,6 +50,8 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.schema.IndexMetadata;
@@ -142,6 +145,9 @@ public abstract class ReadCommand extends AbstractReadQuery
                           IndexMetadata index)
     {
         super(metadata, nowInSec, columnFilter, rowFilter, limits);
+        if (acceptsTransient && isDigestQuery)
+            throw new IllegalArgumentException("Attempted to issue a digest 
response to transient replica");
+
         this.kind = kind;
         this.isDigestQuery = isDigestQuery;
         this.digestVersion = digestVersion;
@@ -308,10 +314,49 @@ public abstract class ReadCommand extends 
AbstractReadQuery
     public abstract ReadCommand copy();
 
     /**
+     * Returns a copy of this command with acceptsTransient set to true.
+     */
+    public ReadCommand copyAsTransientQuery(Replica replica)
+    {
+        Preconditions.checkArgument(replica.isTransient(),
+                                    "Can't make a transient request on a full 
replica: " + replica);
+        return copyAsTransientQuery();
+    }
+
+    /**
+     * Returns a copy of this command with acceptsTransient set to true.
+     */
+    public ReadCommand copyAsTransientQuery(ReplicaCollection<?> replicas)
+    {
+        if (Iterables.any(replicas, Replica::isFull))
+            throw new IllegalArgumentException("Can't make a transient request 
on full replicas: " + replicas.filter(Replica::isFull));
+        return copyAsTransientQuery();
+    }
+
+    protected abstract ReadCommand copyAsTransientQuery();
+
+    /**
      * Returns a copy of this command with isDigestQuery set to true.
      */
-    public abstract ReadCommand copyAsDigestQuery();
-    public abstract ReadCommand copyAsTransientQuery();
+    public ReadCommand copyAsDigestQuery(Replica replica)
+    {
+        Preconditions.checkArgument(replica.isFull(),
+                                    "Can't make a digest request on a 
transient replica " + replica);
+        return copyAsDigestQuery();
+    }
+
+    /**
+     * Returns a copy of this command with isDigestQuery set to true.
+     */
+    public ReadCommand copyAsDigestQuery(ReplicaCollection<?> replicas)
+    {
+        if (Iterables.any(replicas, Replica::isTransient))
+            throw new IllegalArgumentException("Can't make a digest request on 
a transient replica " + replicas.filter(Replica::isTransient));
+
+        return copyAsDigestQuery();
+    }
+
+    protected abstract ReadCommand copyAsDigestQuery();
 
     protected abstract UnfilteredPartitionIterator 
queryStorage(ColumnFamilyStore cfs, ReadExecutionController 
executionController);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index e99a487..b763217 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.cache.IRowCacheEntry;
@@ -40,6 +42,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -294,7 +298,8 @@ public class SinglePartitionReadCommand extends ReadCommand 
implements SinglePar
                                               indexMetadata());
     }
 
-    public SinglePartitionReadCommand copyAsDigestQuery()
+    @Override
+    protected SinglePartitionReadCommand copyAsDigestQuery()
     {
         return new SinglePartitionReadCommand(true,
                                               digestVersion(),
@@ -309,7 +314,8 @@ public class SinglePartitionReadCommand extends ReadCommand 
implements SinglePar
                                               indexMetadata());
     }
 
-    public SinglePartitionReadCommand copyAsTransientQuery()
+    @Override
+    protected SinglePartitionReadCommand copyAsTransientQuery()
     {
         return new SinglePartitionReadCommand(false,
                                               0,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index fc49330..0d52afa 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2107,7 +2107,7 @@ public class StorageProxy implements StorageProxyMBean
                 for (Replica replica : replicaPlan.contacts())
                 {
                     Tracing.trace("Enqueuing request to {}", replica);
-                    PartitionRangeReadCommand command = replica.isFull() ? 
rangeCommand : rangeCommand.copyAsTransientQuery();
+                    ReadCommand command = replica.isFull() ? rangeCommand : 
rangeCommand.copyAsTransientQuery(replica);
                     MessageOut<ReadCommand> message = command.createMessage();
                     if (command.isTrackingRepairedStatus() && replica.isFull())
                         message = 
message.withParameter(ParameterType.TRACK_REPAIRED_DATA, 
MessagingService.ONE_BYTE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index c296cba..6881a2f 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -119,14 +119,14 @@ public abstract class AbstractReadExecutor
 
     protected void makeTransientDataRequests(ReplicaCollection<?> replicas)
     {
-        makeRequests(command.copyAsTransientQuery(), replicas);
+        makeRequests(command.copyAsTransientQuery(replicas), replicas);
     }
 
     protected void makeDigestRequests(ReplicaCollection<?> replicas)
     {
         assert all(replicas, Replica::isFull);
         // only send digest requests to full replicas, send data requests 
instead to the transient replicas
-        makeRequests(command.copyAsDigestQuery(), replicas);
+        makeRequests(command.copyAsDigestQuery(replicas), replicas);
     }
 
     private void makeRequests(ReadCommand readCommand, ReplicaCollection<?> 
replicas)
@@ -284,8 +284,8 @@ public abstract class AbstractReadExecutor
                     assert extraReplica != null;
 
                     retryCommand = extraReplica.isTransient()
-                            ? command.copyAsTransientQuery()
-                            : command.copyAsDigestQuery();
+                            ? command.copyAsTransientQuery(extraReplica)
+                            : command.copyAsDigestQuery(extraReplica);
                 }
                 else
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java 
b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index 0dcae95..899baf9 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -57,13 +57,7 @@ public class DigestResolver<E extends Endpoints<E>, P 
extends ReplicaPlan.ForRea
         super.preprocess(message);
         Replica replica = replicaPlan().getReplicaFor(message.from);
         if (dataResponse == null && !message.payload.isDigestResponse() && 
replica.isFull())
-        {
             dataResponse = message;
-        }
-        else if (replica.isTransient() && message.payload.isDigestResponse())
-        {
-            throw new IllegalStateException("digest response received from 
transient replica");
-        }
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java 
b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index 0c1e1ba..aaead84 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -55,6 +55,10 @@ public abstract class ResponseResolver<E extends 
Endpoints<E>, P extends Replica
 
     public void preprocess(MessageIn<ReadResponse> message)
     {
+        if (replicaPlan().getReplicaFor(message.from).isTransient() &&
+            message.payload.isDigestResponse())
+            throw new IllegalArgumentException("Digest response received from 
transient replica");
+
         try
         {
             responses.add(message);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java 
b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 8df7651..fba2bf2 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -52,7 +52,9 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
@@ -681,6 +683,50 @@ public class ReadCommandTest
         assertEquals(cacheHits, cfs.metric.rowCacheHit.getCount());
     }
 
+    @Test (expected = IllegalArgumentException.class)
+    public void copyFullAsTransientTest()
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+        
readCommand.copyAsTransientQuery(ReplicaUtils.full(FBUtilities.getBroadcastAddressAndPort()));
+    }
+
+    @Test (expected = IllegalArgumentException.class)
+    public void copyTransientAsDigestQuery()
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+        
readCommand.copyAsDigestQuery(ReplicaUtils.trans(FBUtilities.getBroadcastAddressAndPort()));
+    }
+
+    @Test (expected = IllegalArgumentException.class)
+    public void copyMultipleFullAsTransientTest()
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+        DecoratedKey key = Util.dk("key");
+        Token token = key.getToken();
+        // Address is unimportant for this test
+        InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
+        ReadCommand readCommand = Util.cmd(cfs, key).build();
+        readCommand.copyAsTransientQuery(EndpointsForToken.of(token,
+                                                              
ReplicaUtils.trans(addr, token),
+                                                              
ReplicaUtils.full(addr, token)));
+    }
+
+    @Test (expected = IllegalArgumentException.class)
+    public void copyMultipleTransientAsDigestQuery()
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
+        DecoratedKey key = Util.dk("key");
+        Token token = key.getToken();
+        // Address is unimportant for this test
+        InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
+        ReadCommand readCommand = Util.cmd(cfs, key).build();
+        readCommand.copyAsDigestQuery(EndpointsForToken.of(token,
+                                                           
ReplicaUtils.trans(addr, token),
+                                                           
ReplicaUtils.full(addr, token)));
+    }
+
     private void testRepairedDataTracking(ColumnFamilyStore cfs, ReadCommand 
readCommand) throws IOException
     {
         cfs.truncateBlocking();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java 
b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
index 66f538f..c5350dc 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
@@ -41,4 +41,14 @@ public class ReplicaUtils
     {
         return transientReplica(endpoint, FULL_RANGE);
     }
+
+    public static Replica full(InetAddressAndPort endpoint, Token token)
+    {
+        return fullReplica(endpoint, new Range<>(token, token));
+    }
+
+    public static Replica trans(InetAddressAndPort endpoint, Token token)
+    {
+        return transientReplica(endpoint, new Range<>(token, token));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to