Repository: cassandra Updated Branches: refs/heads/trunk c683e4ff6 -> c277fc56b (forced update)
Transient Replication support for EACH_QUORUM, and correction of behaviour for LOCAL_QUORUM patch by Benedict; reviewed by Alex Petrov and Ariel Weisberg for CASSANDRA-14727 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c277fc56 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c277fc56 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c277fc56 Branch: refs/heads/trunk Commit: c277fc56b586d7c6db1f0d42fd2253f5484ca3d8 Parents: d8164c6 Author: Benedict Elliott Smith <bened...@apple.com> Authored: Wed Sep 19 12:52:27 2018 +0100 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Fri Nov 30 17:06:32 2018 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ConsistencyLevel.java | 18 ++- .../apache/cassandra/locator/ReplicaPlans.java | 50 +++++--- .../org/apache/cassandra/locator/Replicas.java | 24 +++- .../locator/ReplicaCollectionTest.java | 42 +------ .../cassandra/locator/ReplicaLayoutTest.java | 2 +- .../cassandra/locator/ReplicaPlansTest.java | 120 +++++++++++++++++++ .../apache/cassandra/locator/ReplicaUtils.java | 63 ++++++++++ .../WriteResponseHandlerTransientTest.java | 12 +- 9 files changed, 265 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f145a06..c147eca 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Transient Replication: support EACH_QUORUM (CASSANDRA-14727) * BufferPool: allocating thread for new chunks should acquire directly (CASSANDRA-14832) * Send correct messaging version in internode messaging handshake's third message (CASSANDRA-14896) * Make Read and Write Latency columns consistent for proxyhistograms and tablehistograms (CASSANDRA-11939) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index 9e884a7..4973915 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -27,6 +27,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.transport.ProtocolException; +import static org.apache.cassandra.locator.Replicas.addToCountPerDc; import static org.apache.cassandra.locator.Replicas.countInOurDc; public enum ConsistencyLevel @@ -92,7 +93,12 @@ public enum ConsistencyLevel : quorumFor(keyspace); } - public static ObjectIntOpenHashMap<String> eachQuorumFor(Keyspace keyspace) + public static int localQuorumForOurDc(Keyspace keyspace) + { + return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter()); + } + + public static ObjectIntOpenHashMap<String> eachQuorumForRead(Keyspace keyspace) { NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); ObjectIntOpenHashMap<String> perDc = new ObjectIntOpenHashMap<>(strategy.getDatacenters().size()); @@ -101,6 +107,13 @@ public enum ConsistencyLevel return perDc; } + public static ObjectIntOpenHashMap<String> eachQuorumForWrite(Keyspace keyspace, Endpoints<?> pendingWithDown) + { + ObjectIntOpenHashMap<String> perDc = eachQuorumForRead(keyspace); + addToCountPerDc(perDc, pendingWithDown, 1); + return perDc; + } + public int blockFor(Keyspace keyspace) { switch (this) @@ -121,7 +134,7 @@ public enum ConsistencyLevel return keyspace.getReplicationStrategy().getReplicationFactor().allReplicas; case LOCAL_QUORUM: case LOCAL_SERIAL: - return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter()); + return localQuorumForOurDc(keyspace); case EACH_QUORUM: if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) { @@ -164,6 +177,7 @@ public enum ConsistencyLevel /** * Determine if this consistency level meets or exceeds the consistency requirements of the given cl for the given keyspace + * WARNING: this is not locality aware; you cannot safely use this with mixed locality consistency levels (e.g. LOCAL_QUORUM and QUORUM) */ public boolean satisfies(ConsistencyLevel other, Keyspace keyspace) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/src/java/org/apache/cassandra/locator/ReplicaPlans.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index 5551e5f..a6fe53f 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -19,6 +19,7 @@ package org.apache.cassandra.locator; import com.carrotsearch.hppc.ObjectIntOpenHashMap; +import com.carrotsearch.hppc.cursors.ObjectIntCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ArrayListMultimap; @@ -57,9 +58,13 @@ import java.util.function.Function; import java.util.function.Predicate; import static com.google.common.collect.Iterables.any; +import static com.google.common.collect.Iterables.filter; import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM; -import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumFor; +import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForRead; +import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForWrite; import static org.apache.cassandra.db.ConsistencyLevel.localQuorumFor; +import static org.apache.cassandra.db.ConsistencyLevel.localQuorumForOurDc; +import static org.apache.cassandra.locator.Replicas.addToCountPerDc; import static org.apache.cassandra.locator.Replicas.countInOurDc; import static org.apache.cassandra.locator.Replicas.countPerDc; @@ -77,7 +82,7 @@ public class ReplicaPlans case LOCAL_ONE: return countInOurDc(liveReplicas).hasAtleast(1, 1); case LOCAL_QUORUM: - return countInOurDc(liveReplicas).hasAtleast(consistencyLevel.blockFor(keyspace), 1); + return countInOurDc(liveReplicas).hasAtleast(localQuorumForOurDc(keyspace), 1); case EACH_QUORUM: if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) { @@ -143,7 +148,7 @@ public class ReplicaPlans Collection<String> dcs = ((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getDatacenters(); for (ObjectObjectCursor<String, Replicas.ReplicaCount> entry : countPerDc(dcs, allLive)) { - int dcBlockFor = ConsistencyLevel.localQuorumFor(keyspace, entry.key); + int dcBlockFor = localQuorumFor(keyspace, entry.key); Replicas.ReplicaCount dcCount = entry.value; if (!dcCount.hasAtleast(dcBlockFor, 0)) throw UnavailableException.create(consistencyLevel, entry.key, dcBlockFor, dcCount.allReplicas(), 0, dcCount.fullReplicas()); @@ -340,7 +345,7 @@ public class ReplicaPlans public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException { - EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live); + EndpointsForToken contacts = selector.select(keyspace, liveAndDown, live); assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, live.all(), liveAndDown.pending()); return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts); } @@ -348,20 +353,20 @@ public class ReplicaPlans public interface Selector { <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>> - E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live); + E select(Keyspace keyspace, L liveAndDown, L live); } /** * Select all nodes, transient or otherwise, as targets for the operation. * - * This is may no longer be useful until we finish implementing transient replication support, however + * This is may no longer be useful once we finish implementing transient replication support, however * it can be of value to stipulate that a location writes to all nodes without regard to transient status. */ public static final Selector writeAll = new Selector() { @Override public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>> - E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live) + E select(Keyspace keyspace, L liveAndDown, L live) { return liveAndDown.all(); } @@ -380,22 +385,33 @@ public class ReplicaPlans { @Override public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>> - E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live) + E select(Keyspace keyspace, L liveAndDown, L live) { if (!any(liveAndDown.all(), Replica::isTransient)) return liveAndDown.all(); - assert consistencyLevel != EACH_QUORUM; - ReplicaCollection.Builder<E> contacts = liveAndDown.all().newBuilder(liveAndDown.all().size()); - contacts.addAll(liveAndDown.natural().filterLazily(Replica::isFull)); + contacts.addAll(filter(liveAndDown.natural(), Replica::isFull)); contacts.addAll(liveAndDown.pending()); - // TODO: this doesn't correctly handle LOCAL_QUORUM (or EACH_QUORUM at all) - int liveCount = contacts.count(live.all()::contains); - int requiredTransientCount = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - liveCount; - if (requiredTransientCount > 0) - contacts.addAll(live.natural().filterLazily(Replica::isTransient, requiredTransientCount)); + /** + * Per CASSANDRA-14768, we ensure we write to at least a QUORUM of nodes in every DC, + * regardless of how many responses we need to wait for and our requested consistencyLevel. + * This is to minimally surprise users with transient replication; with normal writes, we + * soft-ensure that we reach QUORUM in all DCs we are able to, by writing to every node; + * even if we don't wait for ACK, we have in both cases sent sufficient messages. + */ + ObjectIntOpenHashMap<String> requiredPerDc = eachQuorumForWrite(keyspace, liveAndDown.pending()); + addToCountPerDc(requiredPerDc, live.natural().filter(Replica::isFull), -1); + addToCountPerDc(requiredPerDc, live.pending(), -1); + + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + for (Replica replica : filter(live.natural(), Replica::isTransient)) + { + String dc = snitch.getDatacenter(replica); + if (requiredPerDc.addTo(dc, -1) >= 0) + contacts.add(replica); + } return contacts.build(); } }; @@ -453,7 +469,7 @@ public class ReplicaPlans private static <E extends Endpoints<E>> E contactForEachQuorumRead(Keyspace keyspace, E candidates) { assert keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy; - ObjectIntOpenHashMap<String> perDc = eachQuorumFor(keyspace); + ObjectIntOpenHashMap<String> perDc = eachQuorumForRead(keyspace); final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); return candidates.filter(replica -> { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/src/java/org/apache/cassandra/locator/Replicas.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Replicas.java b/src/java/org/apache/cassandra/locator/Replicas.java index 6c80134..9e6048a 100644 --- a/src/java/org/apache/cassandra/locator/Replicas.java +++ b/src/java/org/apache/cassandra/locator/Replicas.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; import java.util.function.Predicate; +import com.carrotsearch.hppc.ObjectIntOpenHashMap; import com.carrotsearch.hppc.ObjectObjectOpenHashMap; import com.google.common.collect.Iterables; import org.apache.cassandra.config.DatabaseDescriptor; @@ -84,21 +85,38 @@ public class Replicas return count; } - public static ObjectObjectOpenHashMap<String, ReplicaCount> countPerDc(Collection<String> dataCenters, Iterable<Replica> liveReplicas) + /** + * count the number of full and transient replicas, separately, for each DC + */ + public static ObjectObjectOpenHashMap<String, ReplicaCount> countPerDc(Collection<String> dataCenters, Iterable<Replica> replicas) { ObjectObjectOpenHashMap<String, ReplicaCount> perDc = new ObjectObjectOpenHashMap<>(dataCenters.size()); for (String dc: dataCenters) perDc.put(dc, new ReplicaCount()); - for (Replica replica : liveReplicas) + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + for (Replica replica : replicas) { - String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica); + String dc = snitch.getDatacenter(replica); perDc.get(dc).increment(replica); } return perDc; } /** + * increment each of the map's DC entries for each matching replica provided + */ + public static void addToCountPerDc(ObjectIntOpenHashMap<String> perDc, Iterable<Replica> replicas, int add) + { + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + for (Replica replica : replicas) + { + String dc = snitch.getDatacenter(replica); + perDc.addTo(dc, add); + } + } + + /** * A placeholder for areas of the code that cannot yet handle transient replicas, but should do so in future */ public static void temporaryAssertFull(Replica replica) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java index ced49e2..e2d4797 100644 --- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java +++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java @@ -46,51 +46,11 @@ import static com.google.common.collect.Iterables.*; import static com.google.common.collect.Iterables.filter; import static org.apache.cassandra.locator.Replica.fullReplica; import static org.apache.cassandra.locator.Replica.transientReplica; +import static org.apache.cassandra.locator.ReplicaUtils.*; public class ReplicaCollectionTest { - static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, BROADCAST_EP, NULL_EP; - static final Range<Token> R1, R2, R3, R4, R5, BROADCAST_RANGE, NULL_RANGE; - static final List<InetAddressAndPort> ALL_EP; - static final List<Range<Token>> ALL_R; - static - { - try - { - EP1 = InetAddressAndPort.getByName("127.0.0.1"); - EP2 = InetAddressAndPort.getByName("127.0.0.2"); - EP3 = InetAddressAndPort.getByName("127.0.0.3"); - EP4 = InetAddressAndPort.getByName("127.0.0.4"); - EP5 = InetAddressAndPort.getByName("127.0.0.5"); - BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort(); - NULL_EP = InetAddressAndPort.getByName("127.255.255.255"); - R1 = range(0, 1); - R2 = range(1, 2); - R3 = range(2, 3); - R4 = range(3, 4); - R5 = range(4, 0); - BROADCAST_RANGE = range(10, 11); - NULL_RANGE = range(10000, 10001); - ALL_EP = ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP); - ALL_R = ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - } - - static Token tk(long t) - { - return new Murmur3Partitioner.LongToken(t); - } - - static Range<Token> range(long left, long right) - { - return new Range<>(tk(left), tk(right)); - } - static class TestCase<C extends AbstractReplicaCollection<C>> { final boolean isBuilder; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java b/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java index 9f2ac58..b5b60e3 100644 --- a/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java +++ b/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java @@ -23,7 +23,7 @@ import org.apache.cassandra.dht.Token; import org.junit.Assert; import org.junit.Test; -import static org.apache.cassandra.locator.ReplicaCollectionTest.*; +import static org.apache.cassandra.locator.ReplicaUtils.*; public class ReplicaLayoutTest { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java b/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java new file mode 100644 index 0000000..4d0dd47 --- /dev/null +++ b/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.junit.Test; + +import java.util.Map; +import java.util.Set; + +import static org.apache.cassandra.locator.Replica.fullReplica; +import static org.apache.cassandra.locator.ReplicaUtils.*; + +public class ReplicaPlansTest +{ + + static + { + DatabaseDescriptor.daemonInitialization(); + } + + static class Snitch extends AbstractNetworkTopologySnitch + { + final Set<InetAddressAndPort> dc1; + Snitch(Set<InetAddressAndPort> dc1) + { + this.dc1 = dc1; + } + @Override + public String getRack(InetAddressAndPort endpoint) + { + return dc1.contains(endpoint) ? "R1" : "R2"; + } + + @Override + public String getDatacenter(InetAddressAndPort endpoint) + { + return dc1.contains(endpoint) ? "DC1" : "DC2"; + } + } + + private static Keyspace ks(Set<InetAddressAndPort> dc1, Map<String, String> replication) + { + replication = ImmutableMap.<String, String>builder().putAll(replication).put("class", "NetworkTopologyStrategy").build(); + Keyspace keyspace = Keyspace.mockKS(KeyspaceMetadata.create("blah", KeyspaceParams.create(false, replication))); + Snitch snitch = new Snitch(dc1); + DatabaseDescriptor.setEndpointSnitch(snitch); + keyspace.getReplicationStrategy().snitch = snitch; + return keyspace; + } + + private static Replica full(InetAddressAndPort ep) { return fullReplica(ep, R1); } + + + + @Test + public void testWriteEachQuorum() + { + IEndpointSnitch stash = DatabaseDescriptor.getEndpointSnitch(); + final Token token = tk(1L); + try + { + { + // all full natural + Keyspace ks = ks(ImmutableSet.of(EP1, EP2, EP3), ImmutableMap.of("DC1", "3", "DC2", "3")); + EndpointsForToken natural = EndpointsForToken.of(token, full(EP1), full(EP2), full(EP3), full(EP4), full(EP5), full(EP6)); + EndpointsForToken pending = EndpointsForToken.empty(token); + ReplicaPlan.ForTokenWrite plan = ReplicaPlans.forWrite(ks, ConsistencyLevel.EACH_QUORUM, natural, pending, Predicates.alwaysTrue(), ReplicaPlans.writeNormal); + assertEquals(natural, plan.liveAndDown); + assertEquals(natural, plan.live); + assertEquals(natural, plan.contacts()); + } + { + // all natural and up, one transient in each DC + Keyspace ks = ks(ImmutableSet.of(EP1, EP2, EP3), ImmutableMap.of("DC1", "3", "DC2", "3")); + EndpointsForToken natural = EndpointsForToken.of(token, full(EP1), full(EP2), trans(EP3), full(EP4), full(EP5), trans(EP6)); + EndpointsForToken pending = EndpointsForToken.empty(token); + ReplicaPlan.ForTokenWrite plan = ReplicaPlans.forWrite(ks, ConsistencyLevel.EACH_QUORUM, natural, pending, Predicates.alwaysTrue(), ReplicaPlans.writeNormal); + assertEquals(natural, plan.liveAndDown); + assertEquals(natural, plan.live); + EndpointsForToken expectContacts = EndpointsForToken.of(token, full(EP1), full(EP2), full(EP4), full(EP5)); + assertEquals(expectContacts, plan.contacts()); + } + } + finally + { + DatabaseDescriptor.setEndpointSnitch(stash); + } + + { + // test simple + + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/locator/ReplicaUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java index c5350dc..72c0a06 100644 --- a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java +++ b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java @@ -18,11 +18,17 @@ package org.apache.cassandra.locator; +import com.google.common.collect.ImmutableList; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.FBUtilities; +import org.junit.Assert; + +import java.net.UnknownHostException; +import java.util.List; import static org.apache.cassandra.locator.Replica.fullReplica; import static org.apache.cassandra.locator.Replica.transientReplica; @@ -51,4 +57,61 @@ public class ReplicaUtils { return transientReplica(endpoint, new Range<>(token, token)); } + + static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, EP6, EP7, EP8, EP9, BROADCAST_EP, NULL_EP; + static final Range<Token> R1, R2, R3, R4, R5, R6, R7, R8, R9, BROADCAST_RANGE, NULL_RANGE, WRAP_RANGE; + static final List<InetAddressAndPort> ALL_EP; + static final List<Range<Token>> ALL_R; + + static + { + try + { + EP1 = InetAddressAndPort.getByName("127.0.0.1"); + EP2 = InetAddressAndPort.getByName("127.0.0.2"); + EP3 = InetAddressAndPort.getByName("127.0.0.3"); + EP4 = InetAddressAndPort.getByName("127.0.0.4"); + EP5 = InetAddressAndPort.getByName("127.0.0.5"); + EP6 = InetAddressAndPort.getByName("127.0.0.6"); + EP7 = InetAddressAndPort.getByName("127.0.0.7"); + EP8 = InetAddressAndPort.getByName("127.0.0.8"); + EP9 = InetAddressAndPort.getByName("127.0.0.9"); + BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort(); + NULL_EP = InetAddressAndPort.getByName("127.255.255.255"); + R1 = range(0, 1); + R2 = range(1, 2); + R3 = range(2, 3); + R4 = range(3, 4); + R5 = range(4, 5); + R6 = range(5, 6); + R7 = range(6, 7); + R8 = range(7, 8); + R9 = range(8, 9); + BROADCAST_RANGE = range(10, 11); + NULL_RANGE = range(10000, 10001); + WRAP_RANGE = range(100000, 0); + ALL_EP = ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP); + ALL_R = ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE, WRAP_RANGE); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + static Token tk(long t) + { + return new Murmur3Partitioner.LongToken(t); + } + + static Range<Token> range(long left, long right) + { + return new Range<>(tk(left), tk(right)); + } + + static void assertEquals(AbstractReplicaCollection<?> a, AbstractReplicaCollection<?> b) + { + Assert.assertEquals(a.list, b.list); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java index d31d3f1..15fbd27 100644 --- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java +++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java @@ -173,9 +173,15 @@ public class WriteResponseHandlerTransientTest private static void assertSpeculationReplicas(ReplicaPlan.ForTokenWrite expected, EndpointsForToken replicas, Predicate<InetAddressAndPort> livePredicate) { ReplicaPlan.ForTokenWrite actual = getSpeculationContext(replicas, livePredicate); - Assert.assertTrue(Iterables.elementsEqual(expected.pending(), actual.pending())); - Assert.assertTrue(Iterables.elementsEqual(expected.live(), actual.live())); - Assert.assertTrue(Iterables.elementsEqual(expected.contacts(), actual.contacts())); + assertEquals(expected.pending(), actual.pending()); + assertEquals(expected.live(), actual.live()); + assertEquals(expected.contacts(), actual.contacts()); + } + + private static void assertEquals(ReplicaCollection<?> a, ReplicaCollection<?> b) + { + if (!Iterables.elementsEqual(a, b)) + Assert.assertTrue(a + " vs " + b, false); } private static Predicate<InetAddressAndPort> dead(InetAddressAndPort... endpoints) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org