Fix pending range calculation during moves (3.0 version) patch by kohlisankalp; reviewed by blambov for CASSANDRA-10887
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c1679d1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c1679d1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c1679d1 Branch: refs/heads/cassandra-3.3 Commit: 9c1679d1bd83d1d25fda6dbf29d1738d8e966da5 Parents: 08b241c Author: sankalp kohli <kohlisank...@gmail.com> Authored: Thu Jan 7 16:24:06 2016 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Jan 8 15:25:36 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/dht/Range.java | 21 + .../apache/cassandra/locator/TokenMetadata.java | 34 +- test/unit/org/apache/cassandra/Util.java | 4 +- .../org/apache/cassandra/dht/RangeTest.java | 55 ++ .../org/apache/cassandra/service/MoveTest.java | 496 ++++++++++++++++++- 6 files changed, 581 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c1679d1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7f6b761..1e7f4ed 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -26,6 +26,7 @@ Merged from 2.2: * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474) * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761) Merged from 2.1: + * Fix pending range calculation during moves (CASSANDRA-10887) * Sane default (200Mbps) for inter-DC streaming througput (CASSANDRA-9708) * Match cassandra-loader options in COPY FROM (CASSANDRA-9303) * Fix binding to any address in CqlBulkRecordWriter (CASSANDRA-9309) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c1679d1/src/java/org/apache/cassandra/dht/Range.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java index 985d6f6..b4fed65 100644 --- a/src/java/org/apache/cassandra/dht/Range.java +++ b/src/java/org/apache/cassandra/dht/Range.java @@ -291,7 +291,28 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen return rhs.differenceToFetch(this); } + public Set<Range<T>> subtractAll(Collection<Range<T>> ranges) + { + Set<Range<T>> result = new HashSet<>(); + result.add(this); + for(Range<T> range : ranges) + { + result = substractAllFromToken(result, range); + } + + return result; + } + private static <T extends RingPosition<T>> Set<Range<T>> substractAllFromToken(Set<Range<T>> ranges, Range<T> subtract) + { + Set<Range<T>> result = new HashSet<>(); + for(Range<T> range : ranges) + { + result.addAll(range.subtract(subtract)); + } + + return result; + } /** * Calculate set of the difference ranges of given two ranges * (as current (A, B] and rhs is (C, D]) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c1679d1/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 301613c..f6e9cf7 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -814,14 +814,42 @@ public class TokenMetadata // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. for (Pair<Token, InetAddress> moving : movingEndpoints) { + //Calculate all the ranges which will could be affected. This will include the ranges before and after the move. + Set<Range<Token>> moveAffectedRanges = new HashSet<>(); InetAddress endpoint = moving.right; // address of the moving node + //Add ranges before the move + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + { + moveAffectedRanges.add(range); + } - // moving.left is a new token of the endpoint allLeftMetadata.updateNormalToken(moving.left, endpoint); - + //Add ranges after the move for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) { - newPendingRanges.addPendingRange(range, endpoint); + moveAffectedRanges.add(range); + } + + for(Range<Token> range : moveAffectedRanges) + { + Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); + Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); + Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints); + for(final InetAddress address : difference) + { + Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address); + Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address); + //We want to get rid of any ranges which the node is currently getting. + newRanges.removeAll(oldRanges); + + for(Range<Token> newRange : newRanges) + { + for(Range<Token> pendingRange : newRange.subtractAll(oldRanges)) + { + newPendingRanges.addPendingRange(pendingRange, address); + } + } + } } allLeftMetadata.removeEndpoint(endpoint); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c1679d1/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 91ec6b6..8658ff3 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -191,9 +191,11 @@ public class Util for (int i = hostIdPool.size(); i < howMany; i++) hostIdPool.add(UUID.randomUUID()); + boolean endpointTokenPrefilled = endpointTokens != null && !endpointTokens.isEmpty(); for (int i=0; i<howMany; i++) { - endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i))); + if(!endpointTokenPrefilled) + endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i))); keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5))); hostIds.add(hostIdPool.get(i)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c1679d1/test/unit/org/apache/cassandra/dht/RangeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java b/test/unit/org/apache/cassandra/dht/RangeTest.java index 9c87981..1dd4c5a 100644 --- a/test/unit/org/apache/cassandra/dht/RangeTest.java +++ b/test/unit/org/apache/cassandra/dht/RangeTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.dht; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -29,6 +30,7 @@ import com.google.common.base.Joiner; import org.apache.commons.lang3.StringUtils; import org.junit.Test; +import org.apache.commons.collections.CollectionUtils; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken; import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken; @@ -327,6 +329,59 @@ public class RangeTest assert t1.compareTo(t4) == 0; } + private Range<Token> makeRange(long token1, long token2) + { + return new Range<>(new Murmur3Partitioner.LongToken(token1), new Murmur3Partitioner.LongToken(token2)); + } + + private void assertRanges(Set<Range<Token>> result, Long ... tokens) + { + assert tokens.length % 2 ==0; + + final Set<Range<Token>> expected = new HashSet<>(); + for(int i=0; i < tokens.length; i+=2) + { + expected.add(makeRange(tokens[i], tokens[i+1])); + } + + assert CollectionUtils.isEqualCollection(result, expected); + + } + + @Test + public void testSubtractAll() + { + Range<Token> range = new Range<Token>(new Murmur3Partitioner.LongToken(1L), new Murmur3Partitioner.LongToken(100L)); + + Collection<Range<Token>> collection = new HashSet<>(); + collection.add(makeRange(1L, 10L)); + assertRanges(range.subtractAll(collection), 10L, 100L); + collection.add(makeRange(90L, 100L)); + assertRanges(range.subtractAll(collection), 10L, 90L); + collection.add(makeRange(54L, 60L)); + assertRanges(range.subtractAll(collection), 10L, 54L, 60L, 90L); + collection.add(makeRange(80L, 95L)); + assertRanges(range.subtractAll(collection), 10L, 54L, 60L, 80L); + } + + @Test + public void testSubtractAllWithWrapAround() + { + Range<Token> range = new Range<Token>(new Murmur3Partitioner.LongToken(100L), new Murmur3Partitioner.LongToken(10L)); + + Collection<Range<Token>> collection = new HashSet<>(); + collection.add(makeRange(20L, 30L)); + assertRanges(range.subtractAll(collection), 100L, 10L); + collection.add(makeRange(200L, 500L)); + assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 10L); + collection.add(makeRange(1L, 10L)); + assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 1L); + collection.add(makeRange(0L, 1L)); + assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 0L); + collection.add(makeRange(1000L, 0)); + assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 1000L); + } + private Range<Token> makeRange(String token1, String token2) { return new Range<Token>(new BigIntegerToken(token1), new BigIntegerToken(token2)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c1679d1/test/unit/org/apache/cassandra/service/MoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java index 97fb39e..280c4e6 100644 --- a/test/unit/org/apache/cassandra/service/MoveTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTest.java @@ -30,6 +30,9 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.Schema; @@ -41,23 +44,36 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.locator.AbstractNetworkTopologySnitch; +import org.apache.cassandra.locator.NetworkTopologyStrategy; +import org.apache.cassandra.locator.PendingRangeMaps; import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.SimpleSnitch; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Tables; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; public class MoveTest { private static final IPartitioner partitioner = RandomPartitioner.instance; private static IPartitioner oldPartitioner; - private static final String KEYSPACE1 = "MoveTestKeyspace1"; + //Simple Strategy Keyspaces + private static final String Simple_RF1_KeyspaceName = "MoveTestKeyspace1"; + private static final String Simple_RF2_KeyspaceName = "MoveTestKeyspace5"; + private static final String Simple_RF3_KeyspaceName = "MoveTestKeyspace4"; private static final String KEYSPACE2 = "MoveTestKeyspace2"; private static final String KEYSPACE3 = "MoveTestKeyspace3"; - private static final String KEYSPACE4 = "MoveTestKeyspace4"; + + //Network Strategy Keyspace with RF DC1=1 and DC2=1 and so on. + private static final String Network_11_KeyspaceName = "MoveTestNetwork11"; + private static final String Network_22_KeyspaceName = "MoveTestNetwork22"; + private static final String Network_33_KeyspaceName = "MoveTestNetwork33"; /* * NOTE: the tests above uses RandomPartitioner, which is not the default @@ -71,6 +87,9 @@ public class MoveTest oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner); SchemaLoader.loadSchema(); SchemaLoader.schemaDefinition("MoveTest"); + addNetworkTopologyKeyspace(Network_11_KeyspaceName, 1, 1); + addNetworkTopologyKeyspace(Network_22_KeyspaceName, 2, 2); + addNetworkTopologyKeyspace(Network_33_KeyspaceName, 3, 3); } @AfterClass @@ -86,6 +105,429 @@ public class MoveTest StorageService.instance.getTokenMetadata().clearUnsafe(); } + private static void addNetworkTopologyKeyspace(String keyspaceName, Integer... replicas) throws ConfigurationException + { + + DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch() + { + //Odd IPs are in DC1 and Even are in DC2. Endpoints upto .14 will have unique racks and + // then will be same for a set of three. + @Override + public String getRack(InetAddress endpoint) + { + int ipLastPart = getIPLastPart(endpoint); + if (ipLastPart <= 14) + return UUID.randomUUID().toString(); + else + return "RAC" + (ipLastPart % 3); + } + + @Override + public String getDatacenter(InetAddress endpoint) + { + if (getIPLastPart(endpoint) % 2 == 0) + return "DC2"; + else + return "DC1"; + } + + private int getIPLastPart(InetAddress endpoint) + { + String str = endpoint.toString(); + int index = str.lastIndexOf("."); + return Integer.parseInt(str.substring(index + 1).trim()); + } + }); + + KeyspaceMetadata keyspace = KeyspaceMetadata.create(keyspaceName, + KeyspaceParams.nts(configOptions(replicas)), + Tables.of(CFMetaData.Builder.create(keyspaceName, "CF1") + .addPartitionKey("key", BytesType.instance).build())); + MigrationManager.announceNewKeyspace(keyspace); + } + + private static Object[] configOptions(Integer[] replicas) + { + Object[] configOptions = new Object[(replicas.length * 2)]; + int i = 1, j=0; + for(Integer replica : replicas) + { + if(replica == null) + continue; + configOptions[j++] = "DC" + i++; + configOptions[j++] = replica; + } + return configOptions; + } + + @Test + public void testMoveWithPendingRangesNetworkStrategyRackAwareThirtyNodes() throws Exception + { + StorageService ss = StorageService.instance; + final int RING_SIZE = 60; + + TokenMetadata tmd = ss.getTokenMetadata(); + tmd.clearUnsafe(); + VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + ArrayList<Token> endpointTokens = new ArrayList<>(); + ArrayList<Token> keyTokens = new ArrayList<>(); + List<InetAddress> hosts = new ArrayList<>(); + List<UUID> hostIds = new ArrayList<>(); + + for(int i=0; i < RING_SIZE/2; i++) + { + endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i))); + endpointTokens.add(new BigIntegerToken(String.valueOf((10 * i) + 1))); + } + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); + PendingRangeCalculatorService.instance.blockUntilFinished(); + + //Moving Endpoint 127.0.0.37 in RAC1 with current token 180 + int MOVING_NODE = 36; + moveHost(hosts.get(MOVING_NODE), 215, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(150, 151, "127.0.0.43"), + generatePendingMapEntry(151, 160, "127.0.0.43"),generatePendingMapEntry(160, 161, "127.0.0.43"), + generatePendingMapEntry(161, 170, "127.0.0.43"), generatePendingMapEntry(170, 171, "127.0.0.43"), + generatePendingMapEntry(171, 180, "127.0.0.43"), generatePendingMapEntry(210, 211, "127.0.0.37"), + generatePendingMapEntry(211, 215, "127.0.0.37")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 215, tmd); + + //Moving it back to original spot + moveHost(hosts.get(MOVING_NODE), 180, tmd, valueFactory); + finishMove(hosts.get(MOVING_NODE), 180, tmd); + + } + + @Test + public void testMoveWithPendingRangesNetworkStrategyTenNode() throws Exception + { + StorageService ss = StorageService.instance; + final int RING_SIZE = 14; + + TokenMetadata tmd = ss.getTokenMetadata(); + tmd.clearUnsafe(); + VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + ArrayList<Token> endpointTokens = new ArrayList<>(); + ArrayList<Token> keyTokens = new ArrayList<>(); + List<InetAddress> hosts = new ArrayList<>(); + List<UUID> hostIds = new ArrayList<>(); + + for(int i=0; i < RING_SIZE/2; i++) + { + endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i))); + endpointTokens.add(new BigIntegerToken(String.valueOf((10 * i) + 1))); + } + + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); + PendingRangeCalculatorService.instance.blockUntilFinished(); + + int MOVING_NODE = 0; + moveHost(hosts.get(MOVING_NODE), 5, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"), + generatePendingMapEntry(1, 5, "127.0.0.1")), Network_11_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"), + generatePendingMapEntry(1, 5, "127.0.0.1")), Network_22_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"), + generatePendingMapEntry(1, 5, "127.0.0.1")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 5, tmd); + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.3"), + generatePendingMapEntry(1, 5, "127.0.0.3")), Network_11_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.5"), + generatePendingMapEntry(1, 5, "127.0.0.5")), Network_22_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.7"), + generatePendingMapEntry(1, 5, "127.0.0.7")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + MOVING_NODE = 1; + moveHost(hosts.get(MOVING_NODE), 5, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), Network_11_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), Network_22_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 5, tmd); + + moveHost(hosts.get(MOVING_NODE), 1, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.4")), Network_11_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.6")), Network_22_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.8")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 1, tmd); + + MOVING_NODE = 3; + moveHost(hosts.get(MOVING_NODE), 25, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 10, "127.0.0.6"), + generatePendingMapEntry(10, 11, "127.0.0.6"), generatePendingMapEntry(21, 25, "127.0.0.4")), Network_11_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(61, 0, "127.0.0.6"), + generatePendingMapEntry(0, 1, "127.0.0.6"), generatePendingMapEntry(21, 25, "127.0.0.4"), + generatePendingMapEntry(11, 20, "127.0.0.4"),generatePendingMapEntry(20, 21, "127.0.0.4")), Network_22_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(51, 60, "127.0.0.6"), + generatePendingMapEntry(60, 61, "127.0.0.6"), generatePendingMapEntry(21, 25, "127.0.0.4"), + generatePendingMapEntry(11, 20, "127.0.0.4"), generatePendingMapEntry(20, 21, "127.0.0.4")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 25, tmd); + + moveHost(hosts.get(MOVING_NODE), 11, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 10, "127.0.0.4"), + generatePendingMapEntry(10, 11, "127.0.0.4"), generatePendingMapEntry(21, 25, "127.0.0.8")), Network_11_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(61, 0, "127.0.0.4"), + generatePendingMapEntry(0, 1, "127.0.0.4"), generatePendingMapEntry(11, 20, "127.0.0.8"), + generatePendingMapEntry(20, 21, "127.0.0.8"), generatePendingMapEntry(21, 25, "127.0.0.10")), Network_22_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(51, 60, "127.0.0.4"), + generatePendingMapEntry(60, 61, "127.0.0.4"), generatePendingMapEntry(21, 25, "127.0.0.12"), + generatePendingMapEntry(11, 20, "127.0.0.10"), generatePendingMapEntry(20, 21, "127.0.0.10")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 11, tmd); + } + + @Test + public void testMoveWithPendingRangesSimpleStrategyTenNode() throws Exception + { + StorageService ss = StorageService.instance; + final int RING_SIZE = 10; + + TokenMetadata tmd = ss.getTokenMetadata(); + tmd.clearUnsafe(); + VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + ArrayList<Token> endpointTokens = new ArrayList<>(); + ArrayList<Token> keyTokens = new ArrayList<>(); + List<InetAddress> hosts = new ArrayList<>(); + List<UUID> hostIds = new ArrayList<>(); + + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); + PendingRangeCalculatorService.instance.blockUntilFinished(); + + final int MOVING_NODE = 0; // index of the moving node + moveHost(hosts.get(MOVING_NODE), 2, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 2, tmd); + + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.3")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + moveHost(hosts.get(MOVING_NODE), 1000, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.3")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 1000, tmd); + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + moveHost(hosts.get(MOVING_NODE), 35, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), generatePendingMapEntry(90, 0, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), generatePendingMapEntry(20, 30, "127.0.0.1"), + generatePendingMapEntry(80, 90, "127.0.0.2"), generatePendingMapEntry(90, 0, "127.0.0.3")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), generatePendingMapEntry(20, 30, "127.0.0.1"), + generatePendingMapEntry(80, 90, "127.0.0.3"), generatePendingMapEntry(90, 0, "127.0.0.4"), + generatePendingMapEntry(10, 20, "127.0.0.1"), generatePendingMapEntry(70, 80, "127.0.0.2")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 35, tmd); + + } + + @Test + public void testMoveWithPendingRangesForSimpleStrategyFourNode() throws Exception + { + StorageService ss = StorageService.instance; + final int RING_SIZE = 4; + + TokenMetadata tmd = ss.getTokenMetadata(); + tmd.clearUnsafe(); + VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + ArrayList<Token> endpointTokens = new ArrayList<>(); + ArrayList<Token> keyTokens = new ArrayList<>(); + List<InetAddress> hosts = new ArrayList<>(); + List<UUID> hostIds = new ArrayList<>(); + + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); + PendingRangeCalculatorService.instance.blockUntilFinished(); + + int MOVING_NODE = 0; // index of the moving node + moveHost(hosts.get(MOVING_NODE), 2, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 2, tmd); + + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.3")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + moveHost(hosts.get(MOVING_NODE), 1500, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.3")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 1500, tmd); + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + moveHost(hosts.get(MOVING_NODE), 15, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(10, 15, "127.0.0.1"), generatePendingMapEntry(30, 0, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 30, "127.0.0.2"), generatePendingMapEntry(10, 15, "127.0.0.1"), + generatePendingMapEntry(0, 10, "127.0.0.1")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(15, 20, "127.0.0.2"), + generatePendingMapEntry(0, 10, "127.0.0.1")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 15, tmd); + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 0, "127.0.0.1"), + generatePendingMapEntry(10, 15, "127.0.0.3")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 30, "127.0.0.1"), + generatePendingMapEntry(10, 15, "127.0.0.4"), generatePendingMapEntry(0, 10, "127.0.0.3")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(15, 20, "127.0.0.1"), + generatePendingMapEntry(0, 10, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + moveHost(hosts.get(MOVING_NODE), 26, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 26, "127.0.0.1"), + generatePendingMapEntry(30, 0, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(26, 30, "127.0.0.2"), + generatePendingMapEntry(30, 0, "127.0.0.3"), generatePendingMapEntry(10, 20, "127.0.0.1")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 10, "127.0.0.1"), + generatePendingMapEntry(26, 30, "127.0.0.3")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 26, tmd); + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 26, "127.0.0.4"), + generatePendingMapEntry(30, 0, "127.0.0.1")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 0, "127.0.0.1"), + generatePendingMapEntry(26, 30, "127.0.0.1"), generatePendingMapEntry(10, 20, "127.0.0.4")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(26, 30, "127.0.0.1"), + generatePendingMapEntry(0, 10, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + MOVING_NODE = 3; + + moveHost(hosts.get(MOVING_NODE), 33, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 33, tmd); + + moveHost(hosts.get(MOVING_NODE), 30, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.1")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.2")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.3")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 30, tmd); + } + + private void moveHost(InetAddress host, int token, TokenMetadata tmd, VersionedValue.VersionedValueFactory valueFactory ) + { + StorageService.instance.onChange(host, ApplicationState.STATUS, valueFactory.moving(new BigIntegerToken(String.valueOf(token)))); + PendingRangeCalculatorService.instance.blockUntilFinished(); + assertTrue(tmd.isMoving(host)); + } + + private void finishMove(InetAddress host, int token, TokenMetadata tmd) + { + tmd.removeFromMoving(host); + assertTrue(!tmd.isMoving(host)); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), host); + } + + private Map.Entry<Range<Token>, Collection<InetAddress>> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException + { + Map<Range<Token>, Collection<InetAddress>> pendingRanges = new HashMap<>(); + pendingRanges.put(generateRange(start, end), makeAddrs(endpoints)); + return pendingRanges.entrySet().iterator().next(); + } + + private Map<Range<Token>, Collection<InetAddress>> generatePendingRanges(Map.Entry<Range<Token>, Collection<InetAddress>>... entries) + { + Map<Range<Token>, Collection<InetAddress>> pendingRanges = new HashMap<>(); + for(Map.Entry<Range<Token>, Collection<InetAddress>> entry : entries) + { + pendingRanges.put(entry.getKey(), entry.getValue()); + } + return pendingRanges; + } + + private void assertPendingRanges(TokenMetadata tmd, Map<Range<Token>, Collection<InetAddress>> pendingRanges, String keyspaceName) throws ConfigurationException + { + boolean keyspaceFound = false; + for (String nonSystemKeyspaceName : Schema.instance.getNonSystemKeyspaces()) + { + if(!keyspaceName.equals(nonSystemKeyspaceName)) + continue; + assertMaps(pendingRanges, tmd.getPendingRanges(keyspaceName)); + keyspaceFound = true; + } + + assert keyspaceFound; + } + + private void assertMaps(Map<Range<Token>, Collection<InetAddress>> expected, PendingRangeMaps actual) + { + int sizeOfActual = 0; + Iterator<Map.Entry<Range<Token>, List<InetAddress>>> iterator = actual.iterator(); + while(iterator.hasNext()) + { + Map.Entry<Range<Token>, List<InetAddress>> actualEntry = iterator.next(); + assertNotNull(expected.get(actualEntry.getKey())); + assertEquals(new HashSet<>(expected.get(actualEntry.getKey())), new HashSet<>(actualEntry.getValue())); + sizeOfActual++; + } + + assertEquals(expected.size(), sizeOfActual); + } + /* * Test whether write endpoints is correct when the node is moving. Uses * StorageService.onChange and does not manipulate token metadata directly. @@ -132,6 +574,8 @@ public class MoveTest for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) { strategy = getStrategy(keyspaceName, tmd); + if(strategy instanceof NetworkTopologyStrategy) + continue; int numMoved = 0; for (Token token : keyTokens) { @@ -239,7 +683,7 @@ public class MoveTest * } */ - Multimap<InetAddress, Range<Token>> keyspace1ranges = keyspaceStrategyMap.get(KEYSPACE1).getAddressRanges(); + Multimap<InetAddress, Range<Token>> keyspace1ranges = keyspaceStrategyMap.get(Simple_RF1_KeyspaceName).getAddressRanges(); Collection<Range<Token>> ranges1 = keyspace1ranges.get(InetAddress.getByName("127.0.0.1")); assertEquals(1, collectionSize(ranges1)); assertEquals(generateRange(97, 0), ranges1.iterator().next()); @@ -336,7 +780,7 @@ public class MoveTest * /127.0.0.10=[(70,87], (87,97], (67,70]] * } */ - Multimap<InetAddress, Range<Token>> keyspace4ranges = keyspaceStrategyMap.get(KEYSPACE4).getAddressRanges(); + Multimap<InetAddress, Range<Token>> keyspace4ranges = keyspaceStrategyMap.get(Simple_RF3_KeyspaceName).getAddressRanges(); ranges1 = keyspace4ranges.get(InetAddress.getByName("127.0.0.1")); assertEquals(collectionSize(ranges1), 3); assertTrue(ranges1.equals(generateRanges(97, 0, 70, 87, 87, 97))); @@ -370,17 +814,17 @@ public class MoveTest // pre-calculate the results. Map<String, Multimap<Token, InetAddress>> expectedEndpoints = new HashMap<String, Multimap<Token, InetAddress>>(); - expectedEndpoints.put(KEYSPACE1, HashMultimap.<Token, InetAddress>create()); - expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2")); - expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3")); - expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4")); - expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5")); - expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6")); - expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.1.1")); - expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.7")); - expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.1.2")); - expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.9")); - expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.10")); + expectedEndpoints.put(Simple_RF1_KeyspaceName, HashMultimap.<Token, InetAddress>create()); + expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2")); + expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3")); + expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4")); + expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5")); + expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6")); + expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.1.1")); + expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.7")); + expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.1.2")); + expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.9")); + expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.10")); expectedEndpoints.put(KEYSPACE2, HashMultimap.<Token, InetAddress>create()); expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2")); expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3")); @@ -403,17 +847,17 @@ public class MoveTest expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.1.2")); expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3")); expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4")); - expectedEndpoints.put(KEYSPACE4, HashMultimap.<Token, InetAddress>create()); - expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2", "127.0.0.3", "127.0.0.4")); - expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3", "127.0.0.4", "127.0.0.5")); - expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4", "127.0.0.5", "127.0.0.6")); - expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.1.1")); - expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.1.1")); - expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.1.1", "127.0.1.2")); - expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.1.2")); - expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.1.2")); - expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1")); - expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2")); + expectedEndpoints.put(Simple_RF3_KeyspaceName, HashMultimap.<Token, InetAddress>create()); + expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2", "127.0.0.3", "127.0.0.4")); + expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3", "127.0.0.4", "127.0.0.5")); + expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4", "127.0.0.5", "127.0.0.6")); + expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.1.1")); + expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.1.1")); + expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.1.1", "127.0.1.2")); + expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.1.2")); + expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.1.2")); + expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1")); + expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2")); for (Map.Entry<String, AbstractReplicationStrategy> keyspaceStrategy : keyspaceStrategyMap.entrySet()) {