Fix pending range calculation during moves patch by kohlisankalp; reviewed by blambov for CASSANDRA-10887
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/812df9e8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/812df9e8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/812df9e8 Branch: refs/heads/cassandra-3.0 Commit: 812df9e8bc3cb98258a70a4b34cd6e289ff95e27 Parents: 6d6d189 Author: sankalp kohli <kohlisank...@gmail.com> Authored: Tue Jan 5 15:09:06 2016 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Jan 8 15:18:45 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/dht/Range.java | 21 + .../service/PendingRangeCalculatorService.java | 36 +- test/unit/org/apache/cassandra/Util.java | 4 +- .../org/apache/cassandra/dht/RangeTest.java | 83 +++- .../org/apache/cassandra/service/MoveTest.java | 435 +++++++++++++++++++ 6 files changed, 557 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 14c5ee6..c167098 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.13 + * Fix pending range calculation during moves (CASSANDRA-10887) * Sane default (200Mbps) for inter-DC streaming througput (CASSANDRA-9708) * Match cassandra-loader options in COPY FROM (CASSANDRA-9303) * Fix binding to any address in CqlBulkRecordWriter (CASSANDRA-9309) http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/src/java/org/apache/cassandra/dht/Range.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java index 81c92a2..618a3f4 100644 --- a/src/java/org/apache/cassandra/dht/Range.java +++ b/src/java/org/apache/cassandra/dht/Range.java @@ -300,7 +300,28 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen return rhs.differenceToFetch(this); } + public Set<Range<T>> subtractAll(Collection<Range<T>> ranges) + { + Set<Range<T>> result = new HashSet<>(); + result.add(this); + for(Range<T> range : ranges) + { + result = substractAllFromToken(result, range); + } + + return result; + } + private static <T extends RingPosition<T>> Set<Range<T>> substractAllFromToken(Set<Range<T>> ranges, Range<T> subtract) + { + Set<Range<T>> result = new HashSet<>(); + for(Range<T> range : ranges) + { + result.addAll(range.subtract(subtract)); + } + + return result; + } /** * Calculate set of the difference ranges of given two ranges * (as current (A, B] and rhs is (C, D]) http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java index 0ff8a92..1e7b7bd 100644 --- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java @@ -169,18 +169,44 @@ public class PendingRangeCalculatorService // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes. // We can now finish the calculation by checking moving and relocating nodes. - // For each of the moving nodes, we do the same thing we did for bootstrapping: - // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints()) { + //Calculate all the ranges which will could be affected. This will include the ranges before and after the move. + Set<Range<Token>> moveAffectedRanges = new HashSet<>(); InetAddress endpoint = moving.right; // address of the moving node + //Add ranges before the move + for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) + { + moveAffectedRanges.add(range); + } - // moving.left is a new token of the endpoint allLeftMetadata.updateNormalToken(moving.left, endpoint); - + //Add ranges after the move for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint)) { - pendingRanges.put(range, endpoint); + moveAffectedRanges.add(range); + } + + for(Range<Token> range : moveAffectedRanges) + { + Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata)); + Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); + Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints); + for(final InetAddress address : difference) + { + Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address); + Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address); + //We want to get rid of any ranges which the node is currently getting. + newRanges.removeAll(oldRanges); + + for(Range<Token> newRange : newRanges) + { + for(Range<Token> pendingRange : newRange.subtractAll(oldRanges)) + { + pendingRanges.put(pendingRange, address); + } + } + } } allLeftMetadata.removeEndpoint(endpoint); http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index e05468f..3c2d32c 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -252,9 +252,11 @@ public class Util for (int i = hostIdPool.size(); i < howMany; i++) hostIdPool.add(UUID.randomUUID()); + boolean endpointTokenPrefilled = endpointTokens != null && !endpointTokens.isEmpty(); for (int i=0; i<howMany; i++) { - endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i))); + if(!endpointTokenPrefilled) + endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i))); keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5))); hostIds.add(hostIdPool.get(i)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/test/unit/org/apache/cassandra/dht/RangeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java b/test/unit/org/apache/cassandra/dht/RangeTest.java index 1d8123b..2083f53 100644 --- a/test/unit/org/apache/cassandra/dht/RangeTest.java +++ b/test/unit/org/apache/cassandra/dht/RangeTest.java @@ -19,17 +19,13 @@ package org.apache.cassandra.dht; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.Set; +import java.util.*; import com.google.common.base.Joiner; import static java.util.Arrays.asList; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.junit.Test; @@ -191,12 +187,12 @@ public class RangeTest Set<Range<T>> correct = Range.rangeSet(ranges); Set<Range> result1 = one.intersectionWith(two); assert result1.equals(correct) : String.format("%s != %s", - StringUtils.join(result1, ","), - StringUtils.join(correct, ",")); + StringUtils.join(result1, ","), + StringUtils.join(correct, ",")); Set<Range> result2 = two.intersectionWith(one); assert result2.equals(correct) : String.format("%s != %s", - StringUtils.join(result2, ","), - StringUtils.join(correct, ",")); + StringUtils.join(result2, ","), + StringUtils.join(correct, ",")); } private void assertNoIntersection(Range wraps1, Range nowrap3) @@ -265,15 +261,15 @@ public class RangeTest Range nowrap2 = new Range(new BigIntegerToken("0"), new BigIntegerToken("100")); assertIntersection(wraps1, - nowrap1, - new Range(new BigIntegerToken("0"), new BigIntegerToken("10")), - new Range(new BigIntegerToken("100"), new BigIntegerToken("200"))); + nowrap1, + new Range(new BigIntegerToken("0"), new BigIntegerToken("10")), + new Range(new BigIntegerToken("100"), new BigIntegerToken("200"))); assertIntersection(wraps2, - nowrap1, - new Range(new BigIntegerToken("100"), new BigIntegerToken("200"))); + nowrap1, + new Range(new BigIntegerToken("100"), new BigIntegerToken("200"))); assertIntersection(wraps1, - nowrap2, - new Range(new BigIntegerToken("0"), new BigIntegerToken("10"))); + nowrap2, + new Range(new BigIntegerToken("0"), new BigIntegerToken("10"))); } @Test @@ -331,6 +327,59 @@ public class RangeTest return new Range(new BigIntegerToken(token1), new BigIntegerToken(token2)); } + private Range<Token> makeRange(long token1, long token2) + { + return new Range<Token>(new LongToken(token1), new LongToken(token2)); + } + + private void assertRanges(Set<Range<Token>> result, Long ... tokens) + { + assert tokens.length % 2 ==0; + + final Set<Range<Token>> expected = new HashSet<>(); + for(int i=0; i < tokens.length; i+=2) + { + expected.add(makeRange(tokens[i], tokens[i+1])); + } + + assert CollectionUtils.isEqualCollection(result, expected); + + } + + @Test + public void testSubtractAll() + { + Range<Token> range = new Range<Token>(new LongToken(1L), new LongToken(100L)); + + Collection<Range<Token>> collection = new HashSet<>(); + collection.add(makeRange(1L, 10L)); + assertRanges(range.subtractAll(collection), 10L, 100L); + collection.add(makeRange(90L, 100L)); + assertRanges(range.subtractAll(collection), 10L, 90L); + collection.add(makeRange(54L, 60L)); + assertRanges(range.subtractAll(collection), 10L, 54L, 60L, 90L); + collection.add(makeRange(80L, 95L)); + assertRanges(range.subtractAll(collection), 10L, 54L, 60L, 80L); + } + + @Test + public void testSubtractAllWithWrapAround() + { + Range<Token> range = new Range<Token>(new LongToken(100L), new LongToken(10L)); + + Collection<Range<Token>> collection = new HashSet<>(); + collection.add(makeRange(20L, 30L)); + assertRanges(range.subtractAll(collection), 100L, 10L); + collection.add(makeRange(200L, 500L)); + assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 10L); + collection.add(makeRange(1L, 10L)); + assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 1L); + collection.add(makeRange(0L, 1L)); + assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 0L); + collection.add(makeRange(1000L, 0)); + assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 1000L); + } + private Set<Range> makeRanges(String[][] tokenPairs) { Set<Range> ranges = new HashSet<Range>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/test/unit/org/apache/cassandra/service/MoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java index 821fff0..49e3391 100644 --- a/test/unit/org/apache/cassandra/service/MoveTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTest.java @@ -27,7 +27,13 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import static org.junit.Assert.*; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyType; +import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.locator.AbstractNetworkTopologySnitch; +import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -60,6 +66,9 @@ public class MoveTest { oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner); SchemaLoader.loadSchema(); + addNetworkTopologyKeyspace(Network_11_KeyspaceName, 1, 1); + addNetworkTopologyKeyspace(Network_22_KeyspaceName, 2, 2); + addNetworkTopologyKeyspace(Network_33_KeyspaceName, 3, 3); } @AfterClass @@ -69,6 +78,430 @@ public class MoveTest SchemaLoader.stopGossiper(); } + //Simple Strategy Keyspaces with RF1, 2 and 3 + private static final String Simple_RF1_KeyspaceName = "Keyspace6"; + private static final String Simple_RF2_KeyspaceName = "Keyspace5"; + private static final String Simple_RF3_KeyspaceName = "Keyspace4"; + //Network Strategy Keyspace with RF DC1=1 and DC2=1 and so on. + private static final String Network_11_KeyspaceName = "Network11"; + private static final String Network_22_KeyspaceName = "Network22"; + private static final String Network_33_KeyspaceName = "Network33"; + + private static void addNetworkTopologyKeyspace(String keyspaceName, Integer... replicas) throws ConfigurationException + { + + DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch() + { + //Odd IPs are in DC1 and Even are in DC2. Endpoints upto .14 will have unique racks and + // then will be same for a set of three. + @Override + public String getRack(InetAddress endpoint) + { + int ipLastPart = getIPLastPart(endpoint); + if (ipLastPart <= 14) + return UUID.randomUUID().toString(); + else + return "RAC" + (ipLastPart % 3); + } + + @Override + public String getDatacenter(InetAddress endpoint) + { + if (getIPLastPart(endpoint) % 2 == 0) + return "DC2"; + else + return "DC1"; + } + + private int getIPLastPart(InetAddress endpoint) + { + String str = endpoint.toString(); + int index = str.lastIndexOf("."); + return Integer.parseInt(str.substring(index + 1).trim()); + } + }); + + Class<? extends AbstractReplicationStrategy> strategy = NetworkTopologyStrategy.class; + KSMetaData keyspace = KSMetaData.testMetadata(keyspaceName, strategy, configOptions(replicas), + CFMetaData.sparseCFMetaData(keyspaceName, "CF1", BytesType.instance)); + MigrationManager.announceNewKeyspace(keyspace); + } + + private static Map<String, String> configOptions(Integer[] replicas) + { + Map<String, String> configOptions = new HashMap<>(); + int i = 1; + for(Integer replica : replicas) + { + if(replica == null) + continue; + configOptions.put("DC" + i++, String.valueOf(replica)); + } + return configOptions; + } + + @Test + public void testMoveWithPendingRangesNetworkStrategyRackAwareThirtyNodes() throws Exception + { + StorageService ss = StorageService.instance; + final int RING_SIZE = 60; + + TokenMetadata tmd = ss.getTokenMetadata(); + tmd.clearUnsafe(); + VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + ArrayList<Token> endpointTokens = new ArrayList<>(); + ArrayList<Token> keyTokens = new ArrayList<>(); + List<InetAddress> hosts = new ArrayList<>(); + List<UUID> hostIds = new ArrayList<>(); + + for(int i=0; i < RING_SIZE/2; i++) + { + endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i))); + endpointTokens.add(new BigIntegerToken(String.valueOf((10 * i) + 1))); + } + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); + PendingRangeCalculatorService.instance.blockUntilFinished(); + + //Moving Endpoint 127.0.0.37 in RAC1 with current token 180 + int MOVING_NODE = 36; + moveHost(hosts.get(MOVING_NODE), 215, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(150, 151, "127.0.0.43"), + generatePendingMapEntry(151, 160, "127.0.0.43"),generatePendingMapEntry(160, 161, "127.0.0.43"), + generatePendingMapEntry(161, 170, "127.0.0.43"), generatePendingMapEntry(170, 171, "127.0.0.43"), + generatePendingMapEntry(171, 180, "127.0.0.43"), generatePendingMapEntry(210, 211, "127.0.0.37"), + generatePendingMapEntry(211, 215, "127.0.0.37")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 215, tmd); + + //Moving it back to original spot + moveHost(hosts.get(MOVING_NODE), 180, tmd, valueFactory); + finishMove(hosts.get(MOVING_NODE), 180, tmd); + + } + + @Test + public void testMoveWithPendingRangesNetworkStrategyTenNode() throws Exception + { + StorageService ss = StorageService.instance; + final int RING_SIZE = 14; + + TokenMetadata tmd = ss.getTokenMetadata(); + tmd.clearUnsafe(); + VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + ArrayList<Token> endpointTokens = new ArrayList<>(); + ArrayList<Token> keyTokens = new ArrayList<>(); + List<InetAddress> hosts = new ArrayList<>(); + List<UUID> hostIds = new ArrayList<>(); + + for(int i=0; i < RING_SIZE/2; i++) + { + endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i))); + endpointTokens.add(new BigIntegerToken(String.valueOf((10 * i) + 1))); + } + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); + PendingRangeCalculatorService.instance.blockUntilFinished(); + + int MOVING_NODE = 0; + moveHost(hosts.get(MOVING_NODE), 5, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"), + generatePendingMapEntry(1, 5, "127.0.0.1")), Network_11_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"), + generatePendingMapEntry(1, 5, "127.0.0.1")), Network_22_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"), + generatePendingMapEntry(1, 5, "127.0.0.1")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 5, tmd); + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.3"), + generatePendingMapEntry(1, 5, "127.0.0.3")), Network_11_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.5"), + generatePendingMapEntry(1, 5, "127.0.0.5")), Network_22_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.7"), + generatePendingMapEntry(1, 5, "127.0.0.7")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + MOVING_NODE = 1; + moveHost(hosts.get(MOVING_NODE), 5, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), Network_11_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), Network_22_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 5, tmd); + + moveHost(hosts.get(MOVING_NODE), 1, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.4")), Network_11_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.6")), Network_22_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.8")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 1, tmd); + + MOVING_NODE = 3; + moveHost(hosts.get(MOVING_NODE), 25, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 10, "127.0.0.6"), + generatePendingMapEntry(10, 11, "127.0.0.6"), generatePendingMapEntry(21, 25, "127.0.0.4")), Network_11_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(61, 0, "127.0.0.6"), + generatePendingMapEntry(0, 1, "127.0.0.6"), generatePendingMapEntry(21, 25, "127.0.0.4"), + generatePendingMapEntry(11, 20, "127.0.0.4"),generatePendingMapEntry(20, 21, "127.0.0.4")), Network_22_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(51, 60, "127.0.0.6"), + generatePendingMapEntry(60, 61, "127.0.0.6"), generatePendingMapEntry(21, 25, "127.0.0.4"), + generatePendingMapEntry(11, 20, "127.0.0.4"), generatePendingMapEntry(20, 21, "127.0.0.4")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 25, tmd); + + moveHost(hosts.get(MOVING_NODE), 11, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 10, "127.0.0.4"), + generatePendingMapEntry(10, 11, "127.0.0.4"), generatePendingMapEntry(21, 25, "127.0.0.8")), Network_11_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(61, 0, "127.0.0.4"), + generatePendingMapEntry(0, 1, "127.0.0.4"), generatePendingMapEntry(11, 20, "127.0.0.8"), + generatePendingMapEntry(20, 21, "127.0.0.8"), generatePendingMapEntry(21, 25, "127.0.0.10")), Network_22_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(51, 60, "127.0.0.4"), + generatePendingMapEntry(60, 61, "127.0.0.4"), generatePendingMapEntry(21, 25, "127.0.0.12"), + generatePendingMapEntry(11, 20, "127.0.0.10"), generatePendingMapEntry(20, 21, "127.0.0.10")), Network_33_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 11, tmd); + } + + @Test + public void testMoveWithPendingRangesSimpleStrategyTenNode() throws Exception + { + StorageService ss = StorageService.instance; + final int RING_SIZE = 10; + + TokenMetadata tmd = ss.getTokenMetadata(); + tmd.clearUnsafe(); + VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + ArrayList<Token> endpointTokens = new ArrayList<>(); + ArrayList<Token> keyTokens = new ArrayList<>(); + List<InetAddress> hosts = new ArrayList<>(); + List<UUID> hostIds = new ArrayList<>(); + + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); + PendingRangeCalculatorService.instance.blockUntilFinished(); + + final int MOVING_NODE = 0; // index of the moving node + moveHost(hosts.get(MOVING_NODE), 2, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 2, tmd); + + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.3")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + moveHost(hosts.get(MOVING_NODE), 1000, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.3")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 1000, tmd); + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + moveHost(hosts.get(MOVING_NODE), 35, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), generatePendingMapEntry(90, 0, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), generatePendingMapEntry(20, 30, "127.0.0.1"), + generatePendingMapEntry(80, 90, "127.0.0.2"), generatePendingMapEntry(90, 0, "127.0.0.3")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), generatePendingMapEntry(20, 30, "127.0.0.1"), + generatePendingMapEntry(80, 90, "127.0.0.3"), generatePendingMapEntry(90, 0, "127.0.0.4"), + generatePendingMapEntry(10, 20, "127.0.0.1"), generatePendingMapEntry(70, 80, "127.0.0.2")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 35, tmd); + + } + + @Test + public void testMoveWithPendingRangesForSimpleStrategyFourNode() throws Exception + { + StorageService ss = StorageService.instance; + final int RING_SIZE = 4; + + TokenMetadata tmd = ss.getTokenMetadata(); + tmd.clearUnsafe(); + VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner); + ArrayList<Token> endpointTokens = new ArrayList<>(); + ArrayList<Token> keyTokens = new ArrayList<>(); + List<InetAddress> hosts = new ArrayList<>(); + List<UUID> hostIds = new ArrayList<>(); + + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); + PendingRangeCalculatorService.instance.blockUntilFinished(); + + int MOVING_NODE = 0; // index of the moving node + moveHost(hosts.get(MOVING_NODE), 2, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 2, tmd); + + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.3")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + moveHost(hosts.get(MOVING_NODE), 1500, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.3")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 1500, tmd); + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + moveHost(hosts.get(MOVING_NODE), 15, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(10, 15, "127.0.0.1"), generatePendingMapEntry(30, 0, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 30, "127.0.0.2"), generatePendingMapEntry(10, 15, "127.0.0.1"), + generatePendingMapEntry(0, 10, "127.0.0.1")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(15, 20, "127.0.0.2"), + generatePendingMapEntry(0, 10, "127.0.0.1")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 15, tmd); + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 0, "127.0.0.1"), + generatePendingMapEntry(10, 15, "127.0.0.3")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 30, "127.0.0.1"), + generatePendingMapEntry(10, 15, "127.0.0.4"), generatePendingMapEntry(0, 10, "127.0.0.3")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(15, 20, "127.0.0.1"), + generatePendingMapEntry(0, 10, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + moveHost(hosts.get(MOVING_NODE), 26, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 26, "127.0.0.1"), + generatePendingMapEntry(30, 0, "127.0.0.2")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(26, 30, "127.0.0.2"), + generatePendingMapEntry(30, 0, "127.0.0.3"), generatePendingMapEntry(10, 20, "127.0.0.1")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 10, "127.0.0.1"), + generatePendingMapEntry(26, 30, "127.0.0.3")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 26, tmd); + + moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 26, "127.0.0.4"), + generatePendingMapEntry(30, 0, "127.0.0.1")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 0, "127.0.0.1"), + generatePendingMapEntry(26, 30, "127.0.0.1"), generatePendingMapEntry(10, 20, "127.0.0.4")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(26, 30, "127.0.0.1"), + generatePendingMapEntry(0, 10, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 0, tmd); + + MOVING_NODE = 3; + + moveHost(hosts.get(MOVING_NODE), 33, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 33, tmd); + + moveHost(hosts.get(MOVING_NODE), 30, tmd, valueFactory); + + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.1")), Simple_RF1_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.2")), Simple_RF2_KeyspaceName); + assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.3")), Simple_RF3_KeyspaceName); + + finishMove(hosts.get(MOVING_NODE), 30, tmd); + } + + private void moveHost(InetAddress host, int token, TokenMetadata tmd, VersionedValue.VersionedValueFactory valueFactory ) + { + StorageService.instance.onChange(host, ApplicationState.STATUS, valueFactory.moving(new BigIntegerToken(String.valueOf(token)))); + PendingRangeCalculatorService.instance.blockUntilFinished(); + assertTrue(tmd.isMoving(host)); + } + + private void finishMove(InetAddress host, int token, TokenMetadata tmd) + { + tmd.removeFromMoving(host); + assertTrue(!tmd.isMoving(host)); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), host); + } + + private Map.Entry<Range<Token>, Collection<InetAddress>> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException + { + Map<Range<Token>, Collection<InetAddress>> pendingRanges = new HashMap<>(); + pendingRanges.put(generateRange(start, end), makeAddrs(endpoints)); + return pendingRanges.entrySet().iterator().next(); + } + + private Map<Range<Token>, Collection<InetAddress>> generatePendingRanges(Map.Entry<Range<Token>, Collection<InetAddress>>... entries) + { + Map<Range<Token>, Collection<InetAddress>> pendingRanges = new HashMap<>(); + for(Map.Entry<Range<Token>, Collection<InetAddress>> entry : entries) + { + pendingRanges.put(entry.getKey(), entry.getValue()); + } + return pendingRanges; + } + + private void assertPendingRanges(TokenMetadata tmd, Map<Range<Token>, Collection<InetAddress>> pendingRanges, String keyspaceName) throws ConfigurationException + { + boolean keyspaceFound = false; + for (String nonSystemKeyspaceName : Schema.instance.getNonSystemKeyspaces()) + { + if(!keyspaceName.equals(nonSystemKeyspaceName)) + continue; + assertMaps(pendingRanges, tmd.getPendingRanges(keyspaceName)); + keyspaceFound = true; + } + + assert keyspaceFound; + } + + private void assertMaps(Map<Range<Token>, Collection<InetAddress>> expected, Map<Range<Token>, Collection<InetAddress>> actual) + { + assertEquals(expected.size(), actual.size()); + for(Map.Entry<Range<Token>, Collection<InetAddress>> expectedEntry : expected.entrySet()) + { + assertNotNull(actual.get(expectedEntry.getKey())); + assertEquals(new ArrayList<>(expectedEntry.getValue()), new ArrayList<>(actual.get(expectedEntry.getKey()))); + } + } + /* * Test whether write endpoints is correct when the node is moving. Uses * StorageService.onChange and does not manipulate token metadata directly. @@ -116,6 +549,8 @@ public class MoveTest for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) { strategy = getStrategy(keyspaceName, tmd); + if(strategy instanceof NetworkTopologyStrategy) + continue; int numMoved = 0; for (Token token : keyTokens) {