Repository: cassandra
Updated Branches:
  refs/heads/trunk c683e4ff6 -> c277fc56b (forced update)


Transient Replication support for EACH_QUORUM, and correction of behaviour for 
LOCAL_QUORUM

patch by Benedict; reviewed by Alex Petrov and Ariel Weisberg for 
CASSANDRA-14727


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c277fc56
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c277fc56
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c277fc56

Branch: refs/heads/trunk
Commit: c277fc56b586d7c6db1f0d42fd2253f5484ca3d8
Parents: d8164c6
Author: Benedict Elliott Smith <bened...@apple.com>
Authored: Wed Sep 19 12:52:27 2018 +0100
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Fri Nov 30 17:06:32 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ConsistencyLevel.java   |  18 ++-
 .../apache/cassandra/locator/ReplicaPlans.java  |  50 +++++---
 .../org/apache/cassandra/locator/Replicas.java  |  24 +++-
 .../locator/ReplicaCollectionTest.java          |  42 +------
 .../cassandra/locator/ReplicaLayoutTest.java    |   2 +-
 .../cassandra/locator/ReplicaPlansTest.java     | 120 +++++++++++++++++++
 .../apache/cassandra/locator/ReplicaUtils.java  |  63 ++++++++++
 .../WriteResponseHandlerTransientTest.java      |  12 +-
 9 files changed, 265 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f145a06..c147eca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Transient Replication: support EACH_QUORUM (CASSANDRA-14727)
  * BufferPool: allocating thread for new chunks should acquire directly 
(CASSANDRA-14832)
  * Send correct messaging version in internode messaging handshake's third 
message (CASSANDRA-14896)
  * Make Read and Write Latency columns consistent for proxyhistograms and 
tablehistograms (CASSANDRA-11939)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java 
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 9e884a7..4973915 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -27,6 +27,7 @@ import 
org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.transport.ProtocolException;
 
+import static org.apache.cassandra.locator.Replicas.addToCountPerDc;
 import static org.apache.cassandra.locator.Replicas.countInOurDc;
 
 public enum ConsistencyLevel
@@ -92,7 +93,12 @@ public enum ConsistencyLevel
              : quorumFor(keyspace);
     }
 
-    public static ObjectIntOpenHashMap<String> eachQuorumFor(Keyspace keyspace)
+    public static int localQuorumForOurDc(Keyspace keyspace)
+    {
+        return localQuorumFor(keyspace, 
DatabaseDescriptor.getLocalDataCenter());
+    }
+
+    public static ObjectIntOpenHashMap<String> eachQuorumForRead(Keyspace 
keyspace)
     {
         NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
keyspace.getReplicationStrategy();
         ObjectIntOpenHashMap<String> perDc = new 
ObjectIntOpenHashMap<>(strategy.getDatacenters().size());
@@ -101,6 +107,13 @@ public enum ConsistencyLevel
         return perDc;
     }
 
+    public static ObjectIntOpenHashMap<String> eachQuorumForWrite(Keyspace 
keyspace, Endpoints<?> pendingWithDown)
+    {
+        ObjectIntOpenHashMap<String> perDc = eachQuorumForRead(keyspace);
+        addToCountPerDc(perDc, pendingWithDown, 1);
+        return perDc;
+    }
+
     public int blockFor(Keyspace keyspace)
     {
         switch (this)
@@ -121,7 +134,7 @@ public enum ConsistencyLevel
                 return 
keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
             case LOCAL_QUORUM:
             case LOCAL_SERIAL:
-                return localQuorumFor(keyspace, 
DatabaseDescriptor.getLocalDataCenter());
+                return localQuorumForOurDc(keyspace);
             case EACH_QUORUM:
                 if (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
                 {
@@ -164,6 +177,7 @@ public enum ConsistencyLevel
 
     /**
      * Determine if this consistency level meets or exceeds the consistency 
requirements of the given cl for the given keyspace
+     * WARNING: this is not locality aware; you cannot safely use this with 
mixed locality consistency levels (e.g. LOCAL_QUORUM and QUORUM)
      */
     public boolean satisfies(ConsistencyLevel other, Keyspace keyspace)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/src/java/org/apache/cassandra/locator/ReplicaPlans.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java 
b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index 5551e5f..a6fe53f 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.locator;
 
 import com.carrotsearch.hppc.ObjectIntOpenHashMap;
+import com.carrotsearch.hppc.cursors.ObjectIntCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ArrayListMultimap;
@@ -57,9 +58,13 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 
 import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.filter;
 import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM;
-import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumFor;
+import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForRead;
+import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForWrite;
 import static org.apache.cassandra.db.ConsistencyLevel.localQuorumFor;
+import static org.apache.cassandra.db.ConsistencyLevel.localQuorumForOurDc;
+import static org.apache.cassandra.locator.Replicas.addToCountPerDc;
 import static org.apache.cassandra.locator.Replicas.countInOurDc;
 import static org.apache.cassandra.locator.Replicas.countPerDc;
 
@@ -77,7 +82,7 @@ public class ReplicaPlans
             case LOCAL_ONE:
                 return countInOurDc(liveReplicas).hasAtleast(1, 1);
             case LOCAL_QUORUM:
-                return 
countInOurDc(liveReplicas).hasAtleast(consistencyLevel.blockFor(keyspace), 1);
+                return 
countInOurDc(liveReplicas).hasAtleast(localQuorumForOurDc(keyspace), 1);
             case EACH_QUORUM:
                 if (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
                 {
@@ -143,7 +148,7 @@ public class ReplicaPlans
                     Collection<String> dcs = ((NetworkTopologyStrategy) 
keyspace.getReplicationStrategy()).getDatacenters();
                     for (ObjectObjectCursor<String, Replicas.ReplicaCount> 
entry : countPerDc(dcs, allLive))
                     {
-                        int dcBlockFor = 
ConsistencyLevel.localQuorumFor(keyspace, entry.key);
+                        int dcBlockFor = localQuorumFor(keyspace, entry.key);
                         Replicas.ReplicaCount dcCount = entry.value;
                         if (!dcCount.hasAtleast(dcBlockFor, 0))
                             throw 
UnavailableException.create(consistencyLevel, entry.key, dcBlockFor, 
dcCount.allReplicas(), 0, dcCount.fullReplicas());
@@ -340,7 +345,7 @@ public class ReplicaPlans
 
     public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, 
ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
     {
-        EndpointsForToken contacts = selector.select(keyspace, 
consistencyLevel, liveAndDown, live);
+        EndpointsForToken contacts = selector.select(keyspace, liveAndDown, 
live);
         assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, 
live.all(), liveAndDown.pending());
         return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, 
liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
     }
@@ -348,20 +353,20 @@ public class ReplicaPlans
     public interface Selector
     {
         <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L 
liveAndDown, L live);
+        E select(Keyspace keyspace, L liveAndDown, L live);
     }
 
     /**
      * Select all nodes, transient or otherwise, as targets for the operation.
      *
-     * This is may no longer be useful until we finish implementing transient 
replication support, however
+     * This is may no longer be useful once we finish implementing transient 
replication support, however
      * it can be of value to stipulate that a location writes to all nodes 
without regard to transient status.
      */
     public static final Selector writeAll = new Selector()
     {
         @Override
         public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L 
liveAndDown, L live)
+        E select(Keyspace keyspace, L liveAndDown, L live)
         {
             return liveAndDown.all();
         }
@@ -380,22 +385,33 @@ public class ReplicaPlans
     {
         @Override
         public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L 
liveAndDown, L live)
+        E select(Keyspace keyspace, L liveAndDown, L live)
         {
             if (!any(liveAndDown.all(), Replica::isTransient))
                 return liveAndDown.all();
 
-            assert consistencyLevel != EACH_QUORUM;
-
             ReplicaCollection.Builder<E> contacts = 
liveAndDown.all().newBuilder(liveAndDown.all().size());
-            
contacts.addAll(liveAndDown.natural().filterLazily(Replica::isFull));
+            contacts.addAll(filter(liveAndDown.natural(), Replica::isFull));
             contacts.addAll(liveAndDown.pending());
 
-            // TODO: this doesn't correctly handle LOCAL_QUORUM (or 
EACH_QUORUM at all)
-            int liveCount = contacts.count(live.all()::contains);
-            int requiredTransientCount = 
consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - liveCount;
-            if (requiredTransientCount > 0)
-                
contacts.addAll(live.natural().filterLazily(Replica::isTransient, 
requiredTransientCount));
+            /**
+             * Per CASSANDRA-14768, we ensure we write to at least a QUORUM of 
nodes in every DC,
+             * regardless of how many responses we need to wait for and our 
requested consistencyLevel.
+             * This is to minimally surprise users with transient replication; 
with normal writes, we
+             * soft-ensure that we reach QUORUM in all DCs we are able to, by 
writing to every node;
+             * even if we don't wait for ACK, we have in both cases sent 
sufficient messages.
+              */
+            ObjectIntOpenHashMap<String> requiredPerDc = 
eachQuorumForWrite(keyspace, liveAndDown.pending());
+            addToCountPerDc(requiredPerDc, 
live.natural().filter(Replica::isFull), -1);
+            addToCountPerDc(requiredPerDc, live.pending(), -1);
+
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            for (Replica replica : filter(live.natural(), 
Replica::isTransient))
+            {
+                String dc = snitch.getDatacenter(replica);
+                if (requiredPerDc.addTo(dc, -1) >= 0)
+                    contacts.add(replica);
+            }
             return contacts.build();
         }
     };
@@ -453,7 +469,7 @@ public class ReplicaPlans
     private static <E extends Endpoints<E>> E 
contactForEachQuorumRead(Keyspace keyspace, E candidates)
     {
         assert keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy;
-        ObjectIntOpenHashMap<String> perDc = eachQuorumFor(keyspace);
+        ObjectIntOpenHashMap<String> perDc = eachQuorumForRead(keyspace);
 
         final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
         return candidates.filter(replica -> {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/src/java/org/apache/cassandra/locator/Replicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Replicas.java 
b/src/java/org/apache/cassandra/locator/Replicas.java
index 6c80134..9e6048a 100644
--- a/src/java/org/apache/cassandra/locator/Replicas.java
+++ b/src/java/org/apache/cassandra/locator/Replicas.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.function.Predicate;
 
+import com.carrotsearch.hppc.ObjectIntOpenHashMap;
 import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
 import com.google.common.collect.Iterables;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -84,21 +85,38 @@ public class Replicas
         return count;
     }
 
-    public static ObjectObjectOpenHashMap<String, ReplicaCount> 
countPerDc(Collection<String> dataCenters, Iterable<Replica> liveReplicas)
+    /**
+     * count the number of full and transient replicas, separately, for each DC
+     */
+    public static ObjectObjectOpenHashMap<String, ReplicaCount> 
countPerDc(Collection<String> dataCenters, Iterable<Replica> replicas)
     {
         ObjectObjectOpenHashMap<String, ReplicaCount> perDc = new 
ObjectObjectOpenHashMap<>(dataCenters.size());
         for (String dc: dataCenters)
             perDc.put(dc, new ReplicaCount());
 
-        for (Replica replica : liveReplicas)
+        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        for (Replica replica : replicas)
         {
-            String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
+            String dc = snitch.getDatacenter(replica);
             perDc.get(dc).increment(replica);
         }
         return perDc;
     }
 
     /**
+     * increment each of the map's DC entries for each matching replica 
provided
+     */
+    public static void addToCountPerDc(ObjectIntOpenHashMap<String> perDc, 
Iterable<Replica> replicas, int add)
+    {
+        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        for (Replica replica : replicas)
+        {
+            String dc = snitch.getDatacenter(replica);
+            perDc.addTo(dc, add);
+        }
+    }
+
+    /**
      * A placeholder for areas of the code that cannot yet handle transient 
replicas, but should do so in future
      */
     public static void temporaryAssertFull(Replica replica)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java 
b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
index ced49e2..e2d4797 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
@@ -46,51 +46,11 @@ import static com.google.common.collect.Iterables.*;
 import static com.google.common.collect.Iterables.filter;
 import static org.apache.cassandra.locator.Replica.fullReplica;
 import static org.apache.cassandra.locator.Replica.transientReplica;
+import static org.apache.cassandra.locator.ReplicaUtils.*;
 
 public class ReplicaCollectionTest
 {
 
-    static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, BROADCAST_EP, 
NULL_EP;
-    static final Range<Token> R1, R2, R3, R4, R5, BROADCAST_RANGE, NULL_RANGE;
-    static final List<InetAddressAndPort> ALL_EP;
-    static final List<Range<Token>> ALL_R;
-    static
-    {
-        try
-        {
-            EP1 = InetAddressAndPort.getByName("127.0.0.1");
-            EP2 = InetAddressAndPort.getByName("127.0.0.2");
-            EP3 = InetAddressAndPort.getByName("127.0.0.3");
-            EP4 = InetAddressAndPort.getByName("127.0.0.4");
-            EP5 = InetAddressAndPort.getByName("127.0.0.5");
-            BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort();
-            NULL_EP = InetAddressAndPort.getByName("127.255.255.255");
-            R1 = range(0, 1);
-            R2 = range(1, 2);
-            R3 = range(2, 3);
-            R4 = range(3, 4);
-            R5 = range(4, 0);
-            BROADCAST_RANGE = range(10, 11);
-            NULL_RANGE = range(10000, 10001);
-            ALL_EP = ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP);
-            ALL_R = ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE);
-        }
-        catch (UnknownHostException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    static Token tk(long t)
-    {
-        return new Murmur3Partitioner.LongToken(t);
-    }
-
-    static Range<Token> range(long left, long right)
-    {
-        return new Range<>(tk(left), tk(right));
-    }
-
     static class TestCase<C extends AbstractReplicaCollection<C>>
     {
         final boolean isBuilder;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java 
b/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java
index 9f2ac58..b5b60e3 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java
@@ -23,7 +23,7 @@ import org.apache.cassandra.dht.Token;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.apache.cassandra.locator.ReplicaCollectionTest.*;
+import static org.apache.cassandra.locator.ReplicaUtils.*;
 
 public class ReplicaLayoutTest
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java 
b/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java
new file mode 100644
index 0000000..4d0dd47
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.ReplicaUtils.*;
+
+public class ReplicaPlansTest
+{
+
+    static
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    static class Snitch extends AbstractNetworkTopologySnitch
+    {
+        final Set<InetAddressAndPort> dc1;
+        Snitch(Set<InetAddressAndPort> dc1)
+        {
+            this.dc1 = dc1;
+        }
+        @Override
+        public String getRack(InetAddressAndPort endpoint)
+        {
+            return dc1.contains(endpoint) ? "R1" : "R2";
+        }
+
+        @Override
+        public String getDatacenter(InetAddressAndPort endpoint)
+        {
+            return dc1.contains(endpoint) ? "DC1" : "DC2";
+        }
+    }
+
+    private static Keyspace ks(Set<InetAddressAndPort> dc1, Map<String, 
String> replication)
+    {
+        replication = ImmutableMap.<String, 
String>builder().putAll(replication).put("class", 
"NetworkTopologyStrategy").build();
+        Keyspace keyspace = Keyspace.mockKS(KeyspaceMetadata.create("blah", 
KeyspaceParams.create(false, replication)));
+        Snitch snitch = new Snitch(dc1);
+        DatabaseDescriptor.setEndpointSnitch(snitch);
+        keyspace.getReplicationStrategy().snitch = snitch;
+        return keyspace;
+    }
+
+    private static Replica full(InetAddressAndPort ep) { return 
fullReplica(ep, R1); }
+
+
+
+    @Test
+    public void testWriteEachQuorum()
+    {
+        IEndpointSnitch stash = DatabaseDescriptor.getEndpointSnitch();
+        final Token token = tk(1L);
+        try
+        {
+            {
+                // all full natural
+                Keyspace ks = ks(ImmutableSet.of(EP1, EP2, EP3), 
ImmutableMap.of("DC1", "3", "DC2", "3"));
+                EndpointsForToken natural = EndpointsForToken.of(token, 
full(EP1), full(EP2), full(EP3), full(EP4), full(EP5), full(EP6));
+                EndpointsForToken pending = EndpointsForToken.empty(token);
+                ReplicaPlan.ForTokenWrite plan = ReplicaPlans.forWrite(ks, 
ConsistencyLevel.EACH_QUORUM, natural, pending, Predicates.alwaysTrue(), 
ReplicaPlans.writeNormal);
+                assertEquals(natural, plan.liveAndDown);
+                assertEquals(natural, plan.live);
+                assertEquals(natural, plan.contacts());
+            }
+            {
+                // all natural and up, one transient in each DC
+                Keyspace ks = ks(ImmutableSet.of(EP1, EP2, EP3), 
ImmutableMap.of("DC1", "3", "DC2", "3"));
+                EndpointsForToken natural = EndpointsForToken.of(token, 
full(EP1), full(EP2), trans(EP3), full(EP4), full(EP5), trans(EP6));
+                EndpointsForToken pending = EndpointsForToken.empty(token);
+                ReplicaPlan.ForTokenWrite plan = ReplicaPlans.forWrite(ks, 
ConsistencyLevel.EACH_QUORUM, natural, pending, Predicates.alwaysTrue(), 
ReplicaPlans.writeNormal);
+                assertEquals(natural, plan.liveAndDown);
+                assertEquals(natural, plan.live);
+                EndpointsForToken expectContacts = EndpointsForToken.of(token, 
full(EP1), full(EP2), full(EP4), full(EP5));
+                assertEquals(expectContacts, plan.contacts());
+            }
+        }
+        finally
+        {
+            DatabaseDescriptor.setEndpointSnitch(stash);
+        }
+
+        {
+            // test simple
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java 
b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
index c5350dc..72c0a06 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
@@ -18,11 +18,17 @@
 
 package org.apache.cassandra.locator;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.Assert;
+
+import java.net.UnknownHostException;
+import java.util.List;
 
 import static org.apache.cassandra.locator.Replica.fullReplica;
 import static org.apache.cassandra.locator.Replica.transientReplica;
@@ -51,4 +57,61 @@ public class ReplicaUtils
     {
         return transientReplica(endpoint, new Range<>(token, token));
     }
+
+    static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, EP6, EP7, EP8, 
EP9, BROADCAST_EP, NULL_EP;
+    static final Range<Token> R1, R2, R3, R4, R5, R6, R7, R8, R9, 
BROADCAST_RANGE, NULL_RANGE, WRAP_RANGE;
+    static final List<InetAddressAndPort> ALL_EP;
+    static final List<Range<Token>> ALL_R;
+
+    static
+    {
+        try
+        {
+            EP1 = InetAddressAndPort.getByName("127.0.0.1");
+            EP2 = InetAddressAndPort.getByName("127.0.0.2");
+            EP3 = InetAddressAndPort.getByName("127.0.0.3");
+            EP4 = InetAddressAndPort.getByName("127.0.0.4");
+            EP5 = InetAddressAndPort.getByName("127.0.0.5");
+            EP6 = InetAddressAndPort.getByName("127.0.0.6");
+            EP7 = InetAddressAndPort.getByName("127.0.0.7");
+            EP8 = InetAddressAndPort.getByName("127.0.0.8");
+            EP9 = InetAddressAndPort.getByName("127.0.0.9");
+            BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort();
+            NULL_EP = InetAddressAndPort.getByName("127.255.255.255");
+            R1 = range(0, 1);
+            R2 = range(1, 2);
+            R3 = range(2, 3);
+            R4 = range(3, 4);
+            R5 = range(4, 5);
+            R6 = range(5, 6);
+            R7 = range(6, 7);
+            R8 = range(7, 8);
+            R9 = range(8, 9);
+            BROADCAST_RANGE = range(10, 11);
+            NULL_RANGE = range(10000, 10001);
+            WRAP_RANGE = range(100000, 0);
+            ALL_EP = ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP);
+            ALL_R = ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE, 
WRAP_RANGE);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static Token tk(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
+    static Range<Token> range(long left, long right)
+    {
+        return new Range<>(tk(left), tk(right));
+    }
+
+    static void assertEquals(AbstractReplicaCollection<?> a, 
AbstractReplicaCollection<?> b)
+    {
+        Assert.assertEquals(a.list, b.list);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java 
b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
index d31d3f1..15fbd27 100644
--- 
a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
+++ 
b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
@@ -173,9 +173,15 @@ public class WriteResponseHandlerTransientTest
     private static void assertSpeculationReplicas(ReplicaPlan.ForTokenWrite 
expected, EndpointsForToken replicas, Predicate<InetAddressAndPort> 
livePredicate)
     {
         ReplicaPlan.ForTokenWrite actual = getSpeculationContext(replicas, 
livePredicate);
-        Assert.assertTrue(Iterables.elementsEqual(expected.pending(), 
actual.pending()));
-        Assert.assertTrue(Iterables.elementsEqual(expected.live(), 
actual.live()));
-        Assert.assertTrue(Iterables.elementsEqual(expected.contacts(), 
actual.contacts()));
+        assertEquals(expected.pending(), actual.pending());
+        assertEquals(expected.live(), actual.live());
+        assertEquals(expected.contacts(), actual.contacts());
+    }
+
+    private static void assertEquals(ReplicaCollection<?> a, 
ReplicaCollection<?> b)
+    {
+        if (!Iterables.elementsEqual(a, b))
+            Assert.assertTrue(a + " vs " + b, false);
     }
 
     private static Predicate<InetAddressAndPort> dead(InetAddressAndPort... 
endpoints)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to