Repository: cassandra Updated Branches: refs/heads/trunk f100024eb -> 8a73427c6
Remove mentions of transient replication from repair path Patch by Alex Petrov; reviewed by Blake Eggleston and Ariel Weisberg for CASSANDRA-14698 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8a73427c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8a73427c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8a73427c Branch: refs/heads/trunk Commit: 8a73427c6543c94ce49da0ed1f833ec5b8ed4f18 Parents: f100024 Author: Alex Petrov <oleksandr.pet...@gmail.com> Authored: Thu Sep 6 12:37:13 2018 +0200 Committer: Alex Petrov <oleksandr.pet...@gmail.com> Committed: Wed Sep 12 16:56:57 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 3 +- .../service/reads/AbstractReadExecutor.java | 2 +- .../cassandra/service/reads/DataResolver.java | 13 +- .../cassandra/service/reads/DigestResolver.java | 8 +- .../service/reads/ResponseResolver.java | 8 +- .../reads/repair/AbstractReadRepair.java | 4 +- .../reads/repair/ReadRepairDiagnostics.java | 14 +- .../reads/repair/RowIteratorMergeListener.java | 44 ++-- .../reads/DataResolverTransientTest.java | 226 ------------------- .../service/reads/DigestResolverTest.java | 35 ++- 10 files changed, 73 insertions(+), 284 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3cfdcff..264c80f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 4.0 - * fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720) + * Remove mentions of transient replication from repair path (CASSANDRA-14698) + * Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720) * Allow transient node to serve as a repair coordinator (CASSANDRA-14693) * DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot returns wrong value for size() and incorrectly calculates count (CASSANDRA-14696) * AbstractReplicaCollection equals and hash code should throw due to conflict between order sensitive/insensitive uses (CASSANDRA-14700) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index 5543fcc..75885ae 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -81,7 +81,7 @@ public abstract class AbstractReadExecutor this.replicaLayout = replicaLayout; this.initialDataRequestCount = initialDataRequestCount; this.readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime); - this.digestResolver = new DigestResolver<>(command, replicaLayout, readRepair, queryStartNanoTime); + this.digestResolver = new DigestResolver<>(command, replicaLayout, queryStartNanoTime); this.handler = new ReadCallback<>(digestResolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), command, replicaLayout, queryStartNanoTime); this.cfs = cfs; this.traceState = Tracing.instance.get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java index 1f69d6a..a6901b2 100644 --- a/src/java/org/apache/cassandra/service/reads/DataResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java @@ -50,15 +50,18 @@ import org.apache.cassandra.service.reads.repair.RepairedDataTracker; import org.apache.cassandra.service.reads.repair.RepairedDataVerifier; import static com.google.common.collect.Iterables.*; +import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener; public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L> { private final boolean enforceStrictLiveness; + private final ReadRepair<E, L> readRepair; public DataResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime) { - super(command, replicaLayout, readRepair, queryStartNanoTime); + super(command, replicaLayout, queryStartNanoTime); this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); + this.readRepair = readRepair; } public PartitionIterator getData() @@ -157,8 +160,14 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey))); } - private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources, RepairedDataTracker repairedDataTracker) + private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, + L sources, + RepairedDataTracker repairedDataTracker) { + // Avoid wrapping no-op listeners as it doesn't throw + if (partitionListener == UnfilteredPartitionIterators.MergeListener.NOOP) + return partitionListener; + return new UnfilteredPartitionIterators.MergeListener() { public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/DigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java index c3eee43..28c2117 100644 --- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java @@ -44,9 +44,9 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L { private volatile MessageIn<ReadResponse> dataResponse; - public DigestResolver(ReadCommand command, L replicas, ReadRepair<E, L> readRepair, long queryStartNanoTime) + public DigestResolver(ReadCommand command, L replicas, long queryStartNanoTime) { - super(command, replicas, readRepair, queryStartNanoTime); + super(command, replicas, queryStartNanoTime); Preconditions.checkArgument(command instanceof SinglePartitionReadCommand, "DigestResolver can only be used with SinglePartitionReadCommand commands"); } @@ -95,11 +95,11 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L // transient replica response still contains data, which needs to be reconciled. DataResolver<E, L> dataResolver = new DataResolver<>(command, replicaLayout, - (ReadRepair<E, L>) NoopReadRepair.instance, + NoopReadRepair.instance, queryStartNanoTime); dataResolver.preprocess(dataResponse); - // Forward differences to all full nodes + // Reconcile with transient replicas for (MessageIn<ReadResponse> response : responses) { Replica replica = replicaLayout.getReplicaFor(response.from); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/ResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java index e306b4d..298f843 100644 --- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java +++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java @@ -25,7 +25,6 @@ import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.service.reads.repair.ReadRepair; import org.apache.cassandra.utils.concurrent.Accumulator; public abstract class ResponseResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> @@ -34,17 +33,15 @@ public abstract class ResponseResolver<E extends Endpoints<E>, L extends Replica protected final ReadCommand command; protected final L replicaLayout; - protected final ReadRepair<E, L> readRepair; // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints protected final Accumulator<MessageIn<ReadResponse>> responses; protected final long queryStartNanoTime; - public ResponseResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime) + public ResponseResolver(ReadCommand command, L replicaLayout, long queryStartNanoTime) { this.command = command; this.replicaLayout = replicaLayout; - this.readRepair = readRepair; // TODO: calculate max possible replicas for the query (e.g. local dc queries won't contact remotes) this.responses = new Accumulator<>(replicaLayout.all().size()); this.queryStartNanoTime = queryStartNanoTime; @@ -60,7 +57,8 @@ public abstract class ResponseResolver<E extends Endpoints<E>, L extends Replica } catch (IllegalStateException e) { - logger.error("Encountered error while trying to preprocess the message {}: %s in command {}, replicas: {}", message, command, readRepair, replicaLayout.consistencyLevel(), replicaLayout.selected()); + logger.error("Encountered error while trying to preprocess the message {}, in command {}, replica layout: {}", + message, command, replicaLayout); throw e; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java index 493b9d0..528d31b 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java @@ -110,7 +110,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli Tracing.trace("Enqueuing full data read to {}", replica); sendReadCommand(replica, readCallback); } - ReadRepairDiagnostics.startRepair(this, replicaLayout.selected().endpoints(), digestResolver, replicaLayout.all().endpoints()); + ReadRepairDiagnostics.startRepair(this, replicaLayout, digestResolver); } public void awaitReads() throws ReadTimeoutException @@ -150,7 +150,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli Tracing.trace("Enqueuing speculative full data read to {}", replica); sendReadCommand(replica, repair.readCallback); ReadRepairMetrics.speculatedRead.mark(); - ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted.all().endpoints()); + ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java index 6eff395..4c74a89 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java @@ -18,15 +18,14 @@ package org.apache.cassandra.service.reads.repair; -import java.util.Collection; import java.util.Collections; -import java.util.List; import com.google.common.collect.Lists; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.diag.DiagnosticEventService; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.service.reads.DigestResolver; import org.apache.cassandra.service.reads.repair.PartitionRepairEvent.PartitionRepairEventType; import org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType; @@ -39,21 +38,22 @@ final class ReadRepairDiagnostics { } - static void startRepair(AbstractReadRepair readRepair, Collection<InetAddressAndPort> endpointDestinations, - DigestResolver digestResolver, Collection<InetAddressAndPort> allEndpoints) + static void startRepair(AbstractReadRepair readRepair, ReplicaLayout<?, ?> layout, DigestResolver digestResolver) { if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.START_REPAIR)) service.publish(new ReadRepairEvent(ReadRepairEventType.START_REPAIR, - readRepair, endpointDestinations, allEndpoints, digestResolver)); + readRepair, + layout.selected().endpoints(), + layout.all().endpoints(), digestResolver)); } static void speculatedRead(AbstractReadRepair readRepair, InetAddressAndPort endpoint, - Iterable<InetAddressAndPort> allEndpoints) + ReplicaLayout<?, ?> replicaLayout) { if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.SPECULATED_READ)) service.publish(new ReadRepairEvent(ReadRepairEventType.SPECULATED_READ, readRepair, Collections.singletonList(endpoint), - Lists.newArrayList(allEndpoints), null)); + Lists.newArrayList(replicaLayout.all().endpoints()), null)); } static void sendInitialRepair(BlockingPartitionRepair partitionRepair, InetAddressAndPort destination, Mutation mutation) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java index b0c019a..7fe797a 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java @@ -58,7 +58,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis private final ConsistencyLevel consistency; private final PartitionUpdate.Builder[] repairs; - private final Replica[] sources; private final Row.Builder[] currentRows; private final RowDiffListener diffListener; private final ReplicaLayout layout; @@ -79,16 +78,12 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis this.partitionKey = partitionKey; this.columns = columns; this.isReversed = isReversed; - Endpoints<?> sources = layout.selected(); - this.sources = new Replica[sources.size()]; - for (int i = 0; i < sources.size(); i++) - this.sources[i] = sources.get(i); - this.layout = layout; - repairs = new PartitionUpdate.Builder[sources.size()]; - currentRows = new Row.Builder[sources.size()]; - sourceDeletionTime = new DeletionTime[sources.size()]; - markerToRepair = new ClusteringBound[sources.size()]; + int size = layout.selected().size(); + repairs = new PartitionUpdate.Builder[size]; + currentRows = new Row.Builder[size]; + sourceDeletionTime = new DeletionTime[size]; + markerToRepair = new ClusteringBound[size]; this.command = command; this.consistency = consistency; this.readRepair = readRepair; @@ -97,25 +92,25 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis { public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) { - if (merged != null && !merged.equals(original) && !isTransient(i)) + if (merged != null && !merged.equals(original)) currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged); } public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original) { - if (merged != null && !merged.equals(original) && !isTransient(i)) + if (merged != null && !merged.equals(original)) currentRow(i, clustering).addRowDeletion(merged); } public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original) { - if (merged != null && !merged.equals(original) && !isTransient(i)) + if (merged != null && !merged.equals(original)) currentRow(i, clustering).addComplexDeletion(column, merged); } public void onCell(int i, Clustering clustering, Cell merged, Cell original) { - if (merged != null && !merged.equals(original) && isQueried(merged) && !isTransient(i)) + if (merged != null && !merged.equals(original) && isQueried(merged)) currentRow(i, clustering).addCell(merged); } @@ -134,11 +129,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis }; } - private boolean isTransient(int i) - { - return sources[i].isTransient(); - } - private PartitionUpdate.Builder update(int i) { if (repairs[i] == null) @@ -172,9 +162,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis this.partitionLevelDeletion = mergedDeletion; for (int i = 0; i < versions.length; i++) { - if (isTransient(i)) - continue; - if (mergedDeletion.supersedes(versions[i])) update(i).addPartitionDeletion(mergedDeletion); } @@ -209,9 +196,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis for (int i = 0; i < versions.length; i++) { - if (isTransient(i)) - continue; - RangeTombstoneMarker marker = versions[i]; // Update what the source now thinks is the current deletion @@ -326,20 +310,22 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis public void close() { Map<Replica, Mutation> mutations = null; + Endpoints<?> sources = layout.selected(); for (int i = 0; i < repairs.length; i++) { if (repairs[i] == null) continue; - Preconditions.checkState(!isTransient(i), "cannot read repair transient replicas"); - Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, sources[i].endpoint(), false); + Replica source = sources.get(i); + + Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, source.endpoint(), false); if (mutation == null) continue; if (mutations == null) - mutations = Maps.newHashMapWithExpectedSize(sources.length); + mutations = Maps.newHashMapWithExpectedSize(sources.size()); - mutations.put(sources[i], mutation); + mutations.put(source, mutation); } if (mutations != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java deleted file mode 100644 index 8119400..0000000 --- a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.service.reads; - -import java.util.concurrent.TimeUnit; - -import com.google.common.primitives.Ints; - -import org.apache.cassandra.Util; -import org.apache.cassandra.db.DecoratedKey; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.cassandra.db.Clustering; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.DeletionTime; -import org.apache.cassandra.db.EmptyIterators; -import org.apache.cassandra.db.RangeTombstone; -import org.apache.cassandra.db.SimpleBuilders; -import org.apache.cassandra.db.SinglePartitionReadCommand; -import org.apache.cassandra.db.Slice; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.rows.BTreeRow; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.locator.EndpointsForToken; -import org.apache.cassandra.locator.ReplicaLayout; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.service.reads.repair.TestableReadRepair; -import org.apache.cassandra.utils.ByteBufferUtil; - -import static org.apache.cassandra.db.ConsistencyLevel.QUORUM; -import static org.apache.cassandra.locator.Replica.fullReplica; -import static org.apache.cassandra.locator.Replica.transientReplica; -import static org.apache.cassandra.locator.ReplicaUtils.full; -import static org.apache.cassandra.locator.ReplicaUtils.trans; - -/** - * Tests DataResolvers handing of transient replicas - */ -public class DataResolverTransientTest extends AbstractReadResponseTest -{ - private static DecoratedKey key; - - @Before - public void setUp() - { - key = Util.dk("key1"); - } - - private static PartitionUpdate.Builder update(TableMetadata metadata, String key, Row... rows) - { - PartitionUpdate.Builder builder = new PartitionUpdate.Builder(metadata, dk(key), metadata.regularAndStaticColumns(), rows.length, false); - for (Row row: rows) - { - builder.add(row); - } - return builder; - } - - private static PartitionUpdate.Builder update(Row... rows) - { - return update(cfm, "key1", rows); - } - - private static Row.SimpleBuilder rowBuilder(int clustering) - { - return new SimpleBuilders.RowBuilder(cfm, Integer.toString(clustering)); - } - - private static Row row(long timestamp, int clustering, int value) - { - return rowBuilder(clustering).timestamp(timestamp).add("c1", Integer.toString(value)).build(); - } - - private static DeletionTime deletion(long timeMillis) - { - TimeUnit MILLIS = TimeUnit.MILLISECONDS; - return new DeletionTime(MILLIS.toMicros(timeMillis), Ints.checkedCast(MILLIS.toSeconds(timeMillis))); - } - - /** - * Tests that the given update doesn't cause data resolver to attempt to repair a transient replica - */ - private void assertNoTransientRepairs(PartitionUpdate update) - { - SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(update.metadata(), nowInSec, key); - EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3)); - TestableReadRepair repair = new TestableReadRepair(command, QUORUM); - DataResolver resolver = new DataResolver(command, plan(targetReplicas, ConsistencyLevel.QUORUM), repair, 0); - - Assert.assertFalse(resolver.isDataPresent()); - resolver.preprocess(response(command, EP1, iter(update), false)); - resolver.preprocess(response(command, EP2, iter(update), false)); - resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(update.metadata()), false)); - - Assert.assertFalse(repair.dataWasConsumed()); - assertPartitionsEqual(filter(iter(update)), resolver.resolve()); - Assert.assertTrue(repair.dataWasConsumed()); - Assert.assertTrue(repair.sent.toString(), repair.sent.isEmpty()); - } - - @Test - public void emptyRowRepair() - { - assertNoTransientRepairs(update(row(1000, 4, 4), row(1000, 5, 5)).build()); - } - - @Test - public void emptyPartitionDeletionRepairs() - { - PartitionUpdate.Builder builder = update(); - builder.addPartitionDeletion(deletion(1999)); - assertNoTransientRepairs(builder.build()); - } - - /** - * Partition level deletion responses shouldn't sent data to a transient replica - */ - @Test - public void emptyRowDeletionRepairs() - { - PartitionUpdate.Builder builder = update(); - builder.add(rowBuilder(1).timestamp(1999).delete().build()); - assertNoTransientRepairs(builder.build()); - } - - @Test - public void emptyComplexDeletionRepair() - { - - long[] ts = {1000, 2000}; - - Row.Builder builder = BTreeRow.unsortedBuilder(); - builder.newRow(Clustering.EMPTY); - builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec)); - assertNoTransientRepairs(update(cfm2, "key", builder.build()).build()); - - } - - @Test - public void emptyRangeTombstoneRepairs() - { - Slice slice = Slice.make(Clustering.make(ByteBufferUtil.bytes("a")), Clustering.make(ByteBufferUtil.bytes("b"))); - PartitionUpdate.Builder builder = update(); - builder.add(new RangeTombstone(slice, deletion(2000))); - assertNoTransientRepairs(builder.build()); - } - - /** - * If the full replicas need to repair each other, repairs shouldn't be sent to transient replicas - */ - @Test - public void fullRepairsIgnoreTransientReplicas() - { - SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5)); - EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3)); - TestableReadRepair repair = new TestableReadRepair(command, QUORUM); - DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0); - - Assert.assertFalse(resolver.isDataPresent()); - resolver.preprocess(response(command, EP1, iter(update(row(1000, 5, 5)).build()), false)); - resolver.preprocess(response(command, EP2, iter(update(row(2000, 4, 4)).build()), false)); - resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(cfm), false)); - - Assert.assertFalse(repair.dataWasConsumed()); - - consume(resolver.resolve()); - - Assert.assertTrue(repair.dataWasConsumed()); - - Assert.assertTrue(repair.sent.containsKey(EP1)); - Assert.assertTrue(repair.sent.containsKey(EP2)); - Assert.assertFalse(repair.sent.containsKey(EP3)); - } - - /** - * If the transient replica has new data, the full replicas shoould be repaired, the transient one should not - */ - @Test - public void transientMismatchesRepairFullReplicas() - { - SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5)); - EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3)); - TestableReadRepair<?, ?> repair = new TestableReadRepair(command, QUORUM); - DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0); - - Assert.assertFalse(resolver.isDataPresent()); - PartitionUpdate transData = update(row(1000, 5, 5)).build(); - resolver.preprocess(response(command, EP1, EmptyIterators.unfilteredPartition(cfm), false)); - resolver.preprocess(response(command, EP2, EmptyIterators.unfilteredPartition(cfm), false)); - resolver.preprocess(response(command, EP3, iter(transData), false)); - - Assert.assertFalse(repair.dataWasConsumed()); - - assertPartitionsEqual(filter(iter(transData)), resolver.resolve()); - - Assert.assertTrue(repair.dataWasConsumed()); - - assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP1).getPartitionUpdate(cfm)))); - assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP2).getPartitionUpdate(cfm)))); - Assert.assertFalse(repair.sent.containsKey(EP3)); - - } - - private ReplicaLayout.ForToken plan(EndpointsForToken replicas, ConsistencyLevel consistencyLevel) - { - return new ReplicaLayout.ForToken(ks, consistencyLevel, replicas.token(), replicas, null, replicas); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a73427c/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java index 5306a74..8454d6a 100644 --- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java +++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java @@ -64,8 +64,7 @@ public class DigestResolverTest extends AbstractReadResponseTest { SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk); EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2)); - TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM); - DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), readRepair, 0); + DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0); PartitionUpdate response = update(row(1000, 4, 4), row(1000, 5, 5)).build(); @@ -83,7 +82,7 @@ public class DigestResolverTest extends AbstractReadResponseTest { SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk); EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2)); - DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), NoopReadRepair.instance,0); + DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0); PartitionUpdate response1 = update(row(1000, 4, 4), row(1000, 5, 5)).build(); PartitionUpdate response2 = update(row(2000, 4, 5)).build(); @@ -104,8 +103,7 @@ public class DigestResolverTest extends AbstractReadResponseTest { SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk); EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), trans(EP2)); - TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM); - DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), readRepair, 0); + DigestResolver<?, ?> resolver = new DigestResolver<>(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0); PartitionUpdate response1 = update(row(1000, 4, 4), row(1000, 5, 5)).build(); PartitionUpdate response2 = update(row(1000, 5, 5)).build(); @@ -116,7 +114,6 @@ public class DigestResolverTest extends AbstractReadResponseTest Assert.assertTrue(resolver.isDataPresent()); Assert.assertTrue(resolver.responsesMatch()); Assert.assertTrue(resolver.hasTransientResponse()); - Assert.assertTrue(readRepair.sent.isEmpty()); } /** @@ -127,7 +124,7 @@ public class DigestResolverTest extends AbstractReadResponseTest { SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk); EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), trans(EP2)); - DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), NoopReadRepair.instance, 0); + DigestResolver<?, ?> resolver = new DigestResolver<>(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0); PartitionUpdate response2 = update(row(1000, 5, 5)).build(); Assert.assertFalse(resolver.isDataPresent()); @@ -137,6 +134,30 @@ public class DigestResolverTest extends AbstractReadResponseTest Assert.assertTrue(resolver.hasTransientResponse()); } + @Test + public void transientResponseData() + { + SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk); + EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2), trans(EP3)); + DigestResolver<?, ?> resolver = new DigestResolver<>(command, plan(ConsistencyLevel.QUORUM, targetReplicas), 0); + + PartitionUpdate fullResponse = update(row(1000, 1, 1)).build(); + PartitionUpdate digestResponse = update(row(1000, 1, 1)).build(); + PartitionUpdate transientResponse = update(row(1000, 2, 2)).build(); + Assert.assertFalse(resolver.isDataPresent()); + Assert.assertFalse(resolver.hasTransientResponse()); + resolver.preprocess(response(command, EP1, iter(fullResponse), false)); + Assert.assertTrue(resolver.isDataPresent()); + resolver.preprocess(response(command, EP2, iter(digestResponse), true)); + resolver.preprocess(response(command, EP3, iter(transientResponse), false)); + Assert.assertTrue(resolver.hasTransientResponse()); + + assertPartitionsEqual(filter(iter(dk, + row(1000, 1, 1), + row(1000, 2, 2))), + resolver.getData()); + } + private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, EndpointsForToken replicas) { return new ReplicaLayout.ForToken(ks, consistencyLevel, replicas.token(), replicas, null, replicas); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org