Repository: cassandra Updated Branches: refs/heads/trunk 57b87d21a -> 59de35332
Add a check for receiving digest response from transient node Patch by Alex Petrov; reviewed by Benedict Elliot Smith for CASSANDRA-14750 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59de3533 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59de3533 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59de3533 Branch: refs/heads/trunk Commit: 59de353325768b6bb8f4dc18a1a2ace5071f8f84 Parents: 57b87d2 Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Wed Sep 12 23:20:34 2018 +0200 Committer: Alex Petrov <oleksandr.pet...@gmail.com> Committed: Fri Sep 21 15:11:39 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/PartitionRangeReadCommand.java | 7 +-- .../org/apache/cassandra/db/ReadCommand.java | 49 +++++++++++++++++++- .../db/SinglePartitionReadCommand.java | 10 +++- .../apache/cassandra/service/StorageProxy.java | 2 +- .../service/reads/AbstractReadExecutor.java | 8 ++-- .../cassandra/service/reads/DigestResolver.java | 6 --- .../service/reads/ResponseResolver.java | 4 ++ .../apache/cassandra/db/ReadCommandTest.java | 46 ++++++++++++++++++ .../apache/cassandra/locator/ReplicaUtils.java | 10 ++++ 10 files changed, 125 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 76b7b8b..f9d2f3c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add a check for receiving digest response from transient node (CASSANDRA-14750) * Fail query on transient replica if coordinator only expects full data (CASSANDRA-14704) * Remove mentions of transient replication from repair path (CASSANDRA-14698) * Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 79db18a..7928039 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -21,7 +21,6 @@ import java.io.IOException; import com.google.common.annotations.VisibleForTesting; -import org.apache.cassandra.net.ParameterType; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.filter.*; @@ -173,7 +172,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR indexMetadata()); } - public PartitionRangeReadCommand copyAsDigestQuery() + @Override + protected PartitionRangeReadCommand copyAsDigestQuery() { return new PartitionRangeReadCommand(true, digestVersion(), @@ -187,7 +187,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR indexMetadata()); } - public PartitionRangeReadCommand copyAsTransientQuery() + @Override + protected PartitionRangeReadCommand copyAsTransientQuery() { return new PartitionRangeReadCommand(false, 0, http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index e146b8a..ada4ae6 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -25,6 +25,7 @@ import java.util.function.LongPredicate; import javax.annotation.Nullable; +import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.hash.Hasher; @@ -49,6 +50,8 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaCollection; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.schema.IndexMetadata; @@ -142,6 +145,9 @@ public abstract class ReadCommand extends AbstractReadQuery IndexMetadata index) { super(metadata, nowInSec, columnFilter, rowFilter, limits); + if (acceptsTransient && isDigestQuery) + throw new IllegalArgumentException("Attempted to issue a digest response to transient replica"); + this.kind = kind; this.isDigestQuery = isDigestQuery; this.digestVersion = digestVersion; @@ -308,10 +314,49 @@ public abstract class ReadCommand extends AbstractReadQuery public abstract ReadCommand copy(); /** + * Returns a copy of this command with acceptsTransient set to true. + */ + public ReadCommand copyAsTransientQuery(Replica replica) + { + Preconditions.checkArgument(replica.isTransient(), + "Can't make a transient request on a full replica: " + replica); + return copyAsTransientQuery(); + } + + /** + * Returns a copy of this command with acceptsTransient set to true. + */ + public ReadCommand copyAsTransientQuery(ReplicaCollection<?> replicas) + { + if (Iterables.any(replicas, Replica::isFull)) + throw new IllegalArgumentException("Can't make a transient request on full replicas: " + replicas.filter(Replica::isFull)); + return copyAsTransientQuery(); + } + + protected abstract ReadCommand copyAsTransientQuery(); + + /** * Returns a copy of this command with isDigestQuery set to true. */ - public abstract ReadCommand copyAsDigestQuery(); - public abstract ReadCommand copyAsTransientQuery(); + public ReadCommand copyAsDigestQuery(Replica replica) + { + Preconditions.checkArgument(replica.isFull(), + "Can't make a digest request on a transient replica " + replica); + return copyAsDigestQuery(); + } + + /** + * Returns a copy of this command with isDigestQuery set to true. + */ + public ReadCommand copyAsDigestQuery(ReplicaCollection<?> replicas) + { + if (Iterables.any(replicas, Replica::isTransient)) + throw new IllegalArgumentException("Can't make a digest request on a transient replica " + replicas.filter(Replica::isTransient)); + + return copyAsDigestQuery(); + } + + protected abstract ReadCommand copyAsDigestQuery(); protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index e99a487..b763217 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import java.util.*; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import org.apache.cassandra.cache.IRowCacheEntry; @@ -40,6 +42,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReadsListener; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaCollection; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -294,7 +298,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar indexMetadata()); } - public SinglePartitionReadCommand copyAsDigestQuery() + @Override + protected SinglePartitionReadCommand copyAsDigestQuery() { return new SinglePartitionReadCommand(true, digestVersion(), @@ -309,7 +314,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar indexMetadata()); } - public SinglePartitionReadCommand copyAsTransientQuery() + @Override + protected SinglePartitionReadCommand copyAsTransientQuery() { return new SinglePartitionReadCommand(false, 0, http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index fc49330..0d52afa 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -2107,7 +2107,7 @@ public class StorageProxy implements StorageProxyMBean for (Replica replica : replicaPlan.contacts()) { Tracing.trace("Enqueuing request to {}", replica); - PartitionRangeReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery(); + ReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery(replica); MessageOut<ReadCommand> message = command.createMessage(); if (command.isTrackingRepairedStatus() && replica.isFull()) message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index c296cba..6881a2f 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -119,14 +119,14 @@ public abstract class AbstractReadExecutor protected void makeTransientDataRequests(ReplicaCollection<?> replicas) { - makeRequests(command.copyAsTransientQuery(), replicas); + makeRequests(command.copyAsTransientQuery(replicas), replicas); } protected void makeDigestRequests(ReplicaCollection<?> replicas) { assert all(replicas, Replica::isFull); // only send digest requests to full replicas, send data requests instead to the transient replicas - makeRequests(command.copyAsDigestQuery(), replicas); + makeRequests(command.copyAsDigestQuery(replicas), replicas); } private void makeRequests(ReadCommand readCommand, ReplicaCollection<?> replicas) @@ -284,8 +284,8 @@ public abstract class AbstractReadExecutor assert extraReplica != null; retryCommand = extraReplica.isTransient() - ? command.copyAsTransientQuery() - : command.copyAsDigestQuery(); + ? command.copyAsTransientQuery(extraReplica) + : command.copyAsDigestQuery(extraReplica); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/service/reads/DigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java index 0dcae95..899baf9 100644 --- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java @@ -57,13 +57,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea super.preprocess(message); Replica replica = replicaPlan().getReplicaFor(message.from); if (dataResponse == null && !message.payload.isDigestResponse() && replica.isFull()) - { dataResponse = message; - } - else if (replica.isTransient() && message.payload.isDigestResponse()) - { - throw new IllegalStateException("digest response received from transient replica"); - } } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/src/java/org/apache/cassandra/service/reads/ResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java index 0c1e1ba..aaead84 100644 --- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java +++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java @@ -55,6 +55,10 @@ public abstract class ResponseResolver<E extends Endpoints<E>, P extends Replica public void preprocess(MessageIn<ReadResponse> message) { + if (replicaPlan().getReplicaFor(message.from).isTransient() && + message.payload.isDigestResponse()) + throw new IllegalArgumentException("Digest response received from transient replica"); + try { responses.add(message); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/test/unit/org/apache/cassandra/db/ReadCommandTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index 8df7651..fba2bf2 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -52,7 +52,9 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; +import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.consistent.LocalSessionAccessor; @@ -681,6 +683,50 @@ public class ReadCommandTest assertEquals(cacheHits, cfs.metric.rowCacheHit.getCount()); } + @Test (expected = IllegalArgumentException.class) + public void copyFullAsTransientTest() + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6); + ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build(); + readCommand.copyAsTransientQuery(ReplicaUtils.full(FBUtilities.getBroadcastAddressAndPort())); + } + + @Test (expected = IllegalArgumentException.class) + public void copyTransientAsDigestQuery() + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6); + ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build(); + readCommand.copyAsDigestQuery(ReplicaUtils.trans(FBUtilities.getBroadcastAddressAndPort())); + } + + @Test (expected = IllegalArgumentException.class) + public void copyMultipleFullAsTransientTest() + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6); + DecoratedKey key = Util.dk("key"); + Token token = key.getToken(); + // Address is unimportant for this test + InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort(); + ReadCommand readCommand = Util.cmd(cfs, key).build(); + readCommand.copyAsTransientQuery(EndpointsForToken.of(token, + ReplicaUtils.trans(addr, token), + ReplicaUtils.full(addr, token))); + } + + @Test (expected = IllegalArgumentException.class) + public void copyMultipleTransientAsDigestQuery() + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6); + DecoratedKey key = Util.dk("key"); + Token token = key.getToken(); + // Address is unimportant for this test + InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort(); + ReadCommand readCommand = Util.cmd(cfs, key).build(); + readCommand.copyAsDigestQuery(EndpointsForToken.of(token, + ReplicaUtils.trans(addr, token), + ReplicaUtils.full(addr, token))); + } + private void testRepairedDataTracking(ColumnFamilyStore cfs, ReadCommand readCommand) throws IOException { cfs.truncateBlocking(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59de3533/test/unit/org/apache/cassandra/locator/ReplicaUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java index 66f538f..c5350dc 100644 --- a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java +++ b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java @@ -41,4 +41,14 @@ public class ReplicaUtils { return transientReplica(endpoint, FULL_RANGE); } + + public static Replica full(InetAddressAndPort endpoint, Token token) + { + return fullReplica(endpoint, new Range<>(token, token)); + } + + public static Replica trans(InetAddressAndPort endpoint, Token token) + { + return transientReplica(endpoint, new Range<>(token, token)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org