Fix pending range calculation during moves

patch by kohlisankalp; reviewed by blambov for CASSANDRA-10887


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/812df9e8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/812df9e8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/812df9e8

Branch: refs/heads/cassandra-3.0
Commit: 812df9e8bc3cb98258a70a4b34cd6e289ff95e27
Parents: 6d6d189
Author: sankalp kohli <kohlisank...@gmail.com>
Authored: Tue Jan 5 15:09:06 2016 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Fri Jan 8 15:18:45 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/dht/Range.java    |  21 +
 .../service/PendingRangeCalculatorService.java  |  36 +-
 test/unit/org/apache/cassandra/Util.java        |   4 +-
 .../org/apache/cassandra/dht/RangeTest.java     |  83 +++-
 .../org/apache/cassandra/service/MoveTest.java  | 435 +++++++++++++++++++
 6 files changed, 557 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 14c5ee6..c167098 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.13
+ * Fix pending range calculation during moves (CASSANDRA-10887)
  * Sane default (200Mbps) for inter-DC streaming througput (CASSANDRA-9708)
  * Match cassandra-loader options in COPY FROM (CASSANDRA-9303)
  * Fix binding to any address in CqlBulkRecordWriter (CASSANDRA-9309)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java 
b/src/java/org/apache/cassandra/dht/Range.java
index 81c92a2..618a3f4 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -300,7 +300,28 @@ public class Range<T extends RingPosition<T>> extends 
AbstractBounds<T> implemen
         return rhs.differenceToFetch(this);
     }
 
+    public Set<Range<T>> subtractAll(Collection<Range<T>> ranges)
+    {
+        Set<Range<T>> result = new HashSet<>();
+        result.add(this);
+        for(Range<T> range : ranges)
+        {
+            result = substractAllFromToken(result, range);
+        }
+
+        return result;
+    }
 
+    private static <T extends RingPosition<T>> Set<Range<T>> 
substractAllFromToken(Set<Range<T>> ranges, Range<T> subtract)
+    {
+        Set<Range<T>> result = new HashSet<>();
+        for(Range<T> range : ranges)
+        {
+            result.addAll(range.subtract(subtract));
+        }
+
+        return result;
+    }
     /**
      * Calculate set of the difference ranges of given two ranges
      * (as current (A, B] and rhs is (C, D])

http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java 
b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 0ff8a92..1e7b7bd 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -169,18 +169,44 @@ public class PendingRangeCalculatorService
         // At this stage pendingRanges has been updated according to leaving 
and bootstrapping nodes.
         // We can now finish the calculation by checking moving and relocating 
nodes.
 
-        // For each of the moving nodes, we do the same thing we did for 
bootstrapping:
-        // simply add and remove them one by one to allLeftMetadata and check 
in between what their ranges would be.
         for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints())
         {
+            //Calculate all the ranges which will could be affected. This will 
include the ranges before and after the move.
+            Set<Range<Token>> moveAffectedRanges = new HashSet<>();
             InetAddress endpoint = moving.right; // address of the moving node
+            //Add ranges before the move
+            for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                moveAffectedRanges.add(range);
+            }
 
-            //  moving.left is a new token of the endpoint
             allLeftMetadata.updateNormalToken(moving.left, endpoint);
-
+            //Add ranges after the move
             for (Range<Token> range : 
strategy.getAddressRanges(allLeftMetadata).get(endpoint))
             {
-                pendingRanges.put(range, endpoint);
+                moveAffectedRanges.add(range);
+            }
+
+            for(Range<Token> range : moveAffectedRanges)
+            {
+                Set<InetAddress> currentEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+                Set<InetAddress> newEndpoints = 
ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, 
allLeftMetadata));
+                Set<InetAddress> difference = Sets.difference(newEndpoints, 
currentEndpoints);
+                for(final InetAddress address : difference)
+                {
+                    Collection<Range<Token>> newRanges = 
strategy.getAddressRanges(allLeftMetadata).get(address);
+                    Collection<Range<Token>> oldRanges = 
strategy.getAddressRanges(metadata).get(address);
+                    //We want to get rid of any ranges which the node is 
currently getting.
+                    newRanges.removeAll(oldRanges);
+
+                    for(Range<Token> newRange : newRanges)
+                    {
+                        for(Range<Token> pendingRange : 
newRange.subtractAll(oldRanges))
+                        {
+                            pendingRanges.put(pendingRange, address);
+                        }
+                    }
+                }
             }
 
             allLeftMetadata.removeEndpoint(endpoint);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index e05468f..3c2d32c 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -252,9 +252,11 @@ public class Util
         for (int i = hostIdPool.size(); i < howMany; i++)
             hostIdPool.add(UUID.randomUUID());
 
+        boolean endpointTokenPrefilled = endpointTokens != null && 
!endpointTokens.isEmpty();
         for (int i=0; i<howMany; i++)
         {
-            endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
+            if(!endpointTokenPrefilled)
+                endpointTokens.add(new BigIntegerToken(String.valueOf(10 * 
i)));
             keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
             hostIds.add(hostIdPool.get(i));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/test/unit/org/apache/cassandra/dht/RangeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java 
b/test/unit/org/apache/cassandra/dht/RangeTest.java
index 1d8123b..2083f53 100644
--- a/test/unit/org/apache/cassandra/dht/RangeTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeTest.java
@@ -19,17 +19,13 @@
 package org.apache.cassandra.dht;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
 
 import com.google.common.base.Joiner;
 
 import static java.util.Arrays.asList;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
 
@@ -191,12 +187,12 @@ public class RangeTest
         Set<Range<T>> correct = Range.rangeSet(ranges);
         Set<Range> result1 = one.intersectionWith(two);
         assert result1.equals(correct) : String.format("%s != %s",
-                                                       
StringUtils.join(result1, ","),
-                                                       
StringUtils.join(correct, ","));
+                StringUtils.join(result1, ","),
+                StringUtils.join(correct, ","));
         Set<Range> result2 = two.intersectionWith(one);
         assert result2.equals(correct) : String.format("%s != %s",
-                                                       
StringUtils.join(result2, ","),
-                                                       
StringUtils.join(correct, ","));
+                StringUtils.join(result2, ","),
+                StringUtils.join(correct, ","));
     }
 
     private void assertNoIntersection(Range wraps1, Range nowrap3)
@@ -265,15 +261,15 @@ public class RangeTest
         Range nowrap2 = new Range(new BigIntegerToken("0"), new 
BigIntegerToken("100"));
 
         assertIntersection(wraps1,
-                           nowrap1,
-                           new Range(new BigIntegerToken("0"), new 
BigIntegerToken("10")),
-                           new Range(new BigIntegerToken("100"), new 
BigIntegerToken("200")));
+                nowrap1,
+                new Range(new BigIntegerToken("0"), new BigIntegerToken("10")),
+                new Range(new BigIntegerToken("100"), new 
BigIntegerToken("200")));
         assertIntersection(wraps2,
-                           nowrap1,
-                           new Range(new BigIntegerToken("100"), new 
BigIntegerToken("200")));
+                nowrap1,
+                new Range(new BigIntegerToken("100"), new 
BigIntegerToken("200")));
         assertIntersection(wraps1,
-                           nowrap2,
-                           new Range(new BigIntegerToken("0"), new 
BigIntegerToken("10")));
+                nowrap2,
+                new Range(new BigIntegerToken("0"), new 
BigIntegerToken("10")));
     }
 
     @Test
@@ -331,6 +327,59 @@ public class RangeTest
         return new Range(new BigIntegerToken(token1), new 
BigIntegerToken(token2));
     }
 
+    private Range<Token> makeRange(long token1, long token2)
+    {
+        return   new Range<Token>(new LongToken(token1), new 
LongToken(token2));
+    }
+
+    private void assertRanges(Set<Range<Token>> result, Long ... tokens)
+    {
+        assert tokens.length % 2 ==0;
+
+        final Set<Range<Token>> expected = new HashSet<>();
+        for(int i=0; i < tokens.length; i+=2)
+        {
+            expected.add(makeRange(tokens[i], tokens[i+1]));
+        }
+
+        assert CollectionUtils.isEqualCollection(result, expected);
+
+    }
+
+    @Test
+    public void testSubtractAll()
+    {
+        Range<Token> range = new Range<Token>(new LongToken(1L), new 
LongToken(100L));
+
+        Collection<Range<Token>> collection = new HashSet<>();
+        collection.add(makeRange(1L, 10L));
+        assertRanges(range.subtractAll(collection), 10L, 100L);
+        collection.add(makeRange(90L, 100L));
+        assertRanges(range.subtractAll(collection), 10L, 90L);
+        collection.add(makeRange(54L, 60L));
+        assertRanges(range.subtractAll(collection), 10L, 54L, 60L, 90L);
+        collection.add(makeRange(80L, 95L));
+        assertRanges(range.subtractAll(collection), 10L, 54L, 60L, 80L);
+    }
+
+    @Test
+    public void testSubtractAllWithWrapAround()
+    {
+        Range<Token> range = new Range<Token>(new LongToken(100L), new 
LongToken(10L));
+
+        Collection<Range<Token>> collection = new HashSet<>();
+        collection.add(makeRange(20L, 30L));
+        assertRanges(range.subtractAll(collection), 100L, 10L);
+        collection.add(makeRange(200L, 500L));
+        assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 10L);
+        collection.add(makeRange(1L, 10L));
+        assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 1L);
+        collection.add(makeRange(0L, 1L));
+        assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 0L);
+        collection.add(makeRange(1000L, 0));
+        assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 1000L);
+    }
+
     private Set<Range> makeRanges(String[][] tokenPairs)
     {
         Set<Range> ranges = new HashSet<Range>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java 
b/test/unit/org/apache/cassandra/service/MoveTest.java
index 821fff0..49e3391 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -27,7 +27,13 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import static org.junit.Assert.*;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -60,6 +66,9 @@ public class MoveTest
     {
         oldPartitioner = 
StorageService.instance.setPartitionerUnsafe(partitioner);
         SchemaLoader.loadSchema();
+        addNetworkTopologyKeyspace(Network_11_KeyspaceName, 1, 1);
+        addNetworkTopologyKeyspace(Network_22_KeyspaceName, 2, 2);
+        addNetworkTopologyKeyspace(Network_33_KeyspaceName, 3, 3);
     }
 
     @AfterClass
@@ -69,6 +78,430 @@ public class MoveTest
         SchemaLoader.stopGossiper();
     }
 
+    //Simple Strategy Keyspaces with RF1, 2 and 3
+    private static final String Simple_RF1_KeyspaceName = "Keyspace6";
+    private static final String Simple_RF2_KeyspaceName = "Keyspace5";
+    private static final String Simple_RF3_KeyspaceName = "Keyspace4";
+    //Network Strategy Keyspace with RF DC1=1 and DC2=1 and so on.
+    private static final String Network_11_KeyspaceName = "Network11";
+    private static final String Network_22_KeyspaceName = "Network22";
+    private static final String Network_33_KeyspaceName = "Network33";
+
+    private static void addNetworkTopologyKeyspace(String keyspaceName, 
Integer... replicas) throws ConfigurationException
+    {
+
+        DatabaseDescriptor.setEndpointSnitch(new 
AbstractNetworkTopologySnitch()
+        {
+            //Odd IPs are in DC1 and Even are in DC2. Endpoints upto .14 will 
have unique racks and
+            // then will be same for a set of three.
+            @Override
+            public String getRack(InetAddress endpoint)
+            {
+                int ipLastPart = getIPLastPart(endpoint);
+                if (ipLastPart <= 14)
+                    return UUID.randomUUID().toString();
+                else
+                    return "RAC" + (ipLastPart % 3);
+            }
+
+            @Override
+            public String getDatacenter(InetAddress endpoint)
+            {
+                if (getIPLastPart(endpoint) % 2 == 0)
+                    return "DC2";
+                else
+                    return "DC1";
+            }
+
+            private int getIPLastPart(InetAddress endpoint)
+            {
+                String str = endpoint.toString();
+                int index = str.lastIndexOf(".");
+                return Integer.parseInt(str.substring(index + 1).trim());
+            }
+        });
+
+        Class<? extends AbstractReplicationStrategy> strategy = 
NetworkTopologyStrategy.class;
+        KSMetaData keyspace = KSMetaData.testMetadata(keyspaceName, strategy, 
configOptions(replicas),
+                CFMetaData.sparseCFMetaData(keyspaceName, "CF1", 
BytesType.instance));
+        MigrationManager.announceNewKeyspace(keyspace);
+    }
+
+    private static Map<String, String> configOptions(Integer[] replicas)
+    {
+        Map<String, String> configOptions = new HashMap<>();
+        int i = 1;
+        for(Integer replica : replicas)
+        {
+            if(replica == null)
+                continue;
+            configOptions.put("DC" + i++, String.valueOf(replica));
+        }
+        return configOptions;
+    }
+
+    @Test
+    public void testMoveWithPendingRangesNetworkStrategyRackAwareThirtyNodes() 
throws Exception
+    {
+        StorageService ss = StorageService.instance;
+        final int RING_SIZE = 60;
+
+        TokenMetadata tmd = ss.getTokenMetadata();
+        tmd.clearUnsafe();
+        VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner);
+        ArrayList<Token> endpointTokens = new ArrayList<>();
+        ArrayList<Token> keyTokens = new ArrayList<>();
+        List<InetAddress> hosts = new ArrayList<>();
+        List<UUID> hostIds = new ArrayList<>();
+
+        for(int i=0; i < RING_SIZE/2; i++)
+        {
+            endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
+            endpointTokens.add(new BigIntegerToken(String.valueOf((10 * i) + 
1)));
+        }
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, RING_SIZE);
+        PendingRangeCalculatorService.instance.blockUntilFinished();
+
+        //Moving Endpoint 127.0.0.37 in RAC1 with current token 180
+        int MOVING_NODE = 36;
+        moveHost(hosts.get(MOVING_NODE), 215, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(150, 151, "127.0.0.43"),
+                generatePendingMapEntry(151, 160, 
"127.0.0.43"),generatePendingMapEntry(160, 161, "127.0.0.43"),
+                generatePendingMapEntry(161, 170, "127.0.0.43"), 
generatePendingMapEntry(170, 171, "127.0.0.43"),
+                generatePendingMapEntry(171, 180, "127.0.0.43"), 
generatePendingMapEntry(210, 211, "127.0.0.37"),
+                generatePendingMapEntry(211, 215, "127.0.0.37")), 
Network_33_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 215, tmd);
+
+        //Moving it back to original spot
+        moveHost(hosts.get(MOVING_NODE), 180, tmd, valueFactory);
+        finishMove(hosts.get(MOVING_NODE), 180, tmd);
+
+    }
+
+    @Test
+    public void testMoveWithPendingRangesNetworkStrategyTenNode() throws 
Exception
+    {
+        StorageService ss = StorageService.instance;
+        final int RING_SIZE = 14;
+
+        TokenMetadata tmd = ss.getTokenMetadata();
+        tmd.clearUnsafe();
+        VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner);
+        ArrayList<Token> endpointTokens = new ArrayList<>();
+        ArrayList<Token> keyTokens = new ArrayList<>();
+        List<InetAddress> hosts = new ArrayList<>();
+        List<UUID> hostIds = new ArrayList<>();
+
+        for(int i=0; i < RING_SIZE/2; i++)
+        {
+            endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
+            endpointTokens.add(new BigIntegerToken(String.valueOf((10 * i) + 
1)));
+        }
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, RING_SIZE);
+        PendingRangeCalculatorService.instance.blockUntilFinished();
+
+        int MOVING_NODE = 0;
+        moveHost(hosts.get(MOVING_NODE), 5, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"),
+                generatePendingMapEntry(1, 5, "127.0.0.1")), 
Network_11_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"),
+                generatePendingMapEntry(1, 5, "127.0.0.1")), 
Network_22_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"),
+                generatePendingMapEntry(1, 5, "127.0.0.1")), 
Network_33_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 5, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.3"),
+                generatePendingMapEntry(1, 5, "127.0.0.3")), 
Network_11_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.5"),
+                generatePendingMapEntry(1, 5, "127.0.0.5")), 
Network_22_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.7"),
+                generatePendingMapEntry(1, 5, "127.0.0.7")), 
Network_33_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+        MOVING_NODE = 1;
+        moveHost(hosts.get(MOVING_NODE), 5, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), 
Network_11_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), 
Network_22_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), 
Network_33_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 5, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 1, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.4")), 
Network_11_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.6")), 
Network_22_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.8")), 
Network_33_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 1, tmd);
+
+        MOVING_NODE = 3;
+        moveHost(hosts.get(MOVING_NODE), 25, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1, 10, "127.0.0.6"),
+                generatePendingMapEntry(10, 11, "127.0.0.6"), 
generatePendingMapEntry(21, 25, "127.0.0.4")), Network_11_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(61, 0, "127.0.0.6"),
+                generatePendingMapEntry(0, 1, "127.0.0.6"), 
generatePendingMapEntry(21, 25, "127.0.0.4"),
+                generatePendingMapEntry(11, 20, 
"127.0.0.4"),generatePendingMapEntry(20, 21, "127.0.0.4")), 
Network_22_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(51, 60, "127.0.0.6"),
+                generatePendingMapEntry(60, 61, "127.0.0.6"), 
generatePendingMapEntry(21, 25, "127.0.0.4"),
+                generatePendingMapEntry(11, 20, "127.0.0.4"), 
generatePendingMapEntry(20, 21, "127.0.0.4")), Network_33_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 25, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 11, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1, 10, "127.0.0.4"),
+                generatePendingMapEntry(10, 11, "127.0.0.4"), 
generatePendingMapEntry(21, 25, "127.0.0.8")), Network_11_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(61, 0, "127.0.0.4"),
+                generatePendingMapEntry(0, 1, "127.0.0.4"), 
generatePendingMapEntry(11, 20, "127.0.0.8"),
+                generatePendingMapEntry(20, 21, "127.0.0.8"), 
generatePendingMapEntry(21, 25, "127.0.0.10")), Network_22_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(51, 60, "127.0.0.4"),
+                generatePendingMapEntry(60, 61, "127.0.0.4"), 
generatePendingMapEntry(21, 25, "127.0.0.12"),
+                generatePendingMapEntry(11, 20, "127.0.0.10"), 
generatePendingMapEntry(20, 21, "127.0.0.10")), Network_33_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 11, tmd);
+    }
+
+    @Test
+    public void testMoveWithPendingRangesSimpleStrategyTenNode() throws 
Exception
+    {
+        StorageService ss = StorageService.instance;
+        final int RING_SIZE = 10;
+
+        TokenMetadata tmd = ss.getTokenMetadata();
+        tmd.clearUnsafe();
+        VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner);
+        ArrayList<Token> endpointTokens = new ArrayList<>();
+        ArrayList<Token> keyTokens = new ArrayList<>();
+        List<InetAddress> hosts = new ArrayList<>();
+        List<UUID> hostIds = new ArrayList<>();
+
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, RING_SIZE);
+        PendingRangeCalculatorService.instance.blockUntilFinished();
+
+        final int MOVING_NODE = 0; // index of the moving node
+        moveHost(hosts.get(MOVING_NODE), 2, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), 
Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 2, tmd);
+
+
+        moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.2")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.3")), 
Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.4")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 1000, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.2")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.3")), 
Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.4")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 1000, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), 
Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 35, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), 
generatePendingMapEntry(90, 0, "127.0.0.2")), Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), 
generatePendingMapEntry(20, 30, "127.0.0.1"),
+                generatePendingMapEntry(80, 90, "127.0.0.2"), 
generatePendingMapEntry(90, 0, "127.0.0.3")), Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), 
generatePendingMapEntry(20, 30, "127.0.0.1"),
+                generatePendingMapEntry(80, 90, "127.0.0.3"), 
generatePendingMapEntry(90, 0, "127.0.0.4"),
+                generatePendingMapEntry(10, 20, "127.0.0.1"), 
generatePendingMapEntry(70, 80, "127.0.0.2")), Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 35, tmd);
+
+    }
+
+    @Test
+    public void testMoveWithPendingRangesForSimpleStrategyFourNode() throws 
Exception
+    {
+        StorageService ss = StorageService.instance;
+        final int RING_SIZE = 4;
+
+        TokenMetadata tmd = ss.getTokenMetadata();
+        tmd.clearUnsafe();
+        VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner);
+        ArrayList<Token> endpointTokens = new ArrayList<>();
+        ArrayList<Token> keyTokens = new ArrayList<>();
+        List<InetAddress> hosts = new ArrayList<>();
+        List<UUID> hostIds = new ArrayList<>();
+
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, RING_SIZE);
+        PendingRangeCalculatorService.instance.blockUntilFinished();
+
+        int MOVING_NODE = 0; // index of the moving node
+        moveHost(hosts.get(MOVING_NODE), 2, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), 
Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 2, tmd);
+
+
+        moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.2")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.3")), 
Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.4")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 1500, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.2")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.3")), 
Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.4")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 1500, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), 
Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 15, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(10, 15, "127.0.0.1"), 
generatePendingMapEntry(30, 0, "127.0.0.2")), Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(20, 30, "127.0.0.2"), 
generatePendingMapEntry(10, 15, "127.0.0.1"),
+                generatePendingMapEntry(0, 10, "127.0.0.1")), 
Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(15, 20, "127.0.0.2"),
+                generatePendingMapEntry(0, 10, "127.0.0.1")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 15, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(30, 0, "127.0.0.1"),
+                generatePendingMapEntry(10, 15, "127.0.0.3")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(20, 30, "127.0.0.1"),
+                generatePendingMapEntry(10, 15, "127.0.0.4"), 
generatePendingMapEntry(0, 10, "127.0.0.3")), Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(15, 20, "127.0.0.1"),
+                generatePendingMapEntry(0, 10, "127.0.0.4")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 26, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(20, 26, "127.0.0.1"),
+                generatePendingMapEntry(30, 0, "127.0.0.2")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(26, 30, "127.0.0.2"),
+                generatePendingMapEntry(30, 0, "127.0.0.3"), 
generatePendingMapEntry(10, 20, "127.0.0.1")), Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(0, 10, "127.0.0.1"),
+                generatePendingMapEntry(26, 30, "127.0.0.3")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 26, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(20, 26, "127.0.0.4"),
+                generatePendingMapEntry(30, 0, "127.0.0.1")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(30, 0, "127.0.0.1"),
+                generatePendingMapEntry(26, 30, "127.0.0.1"), 
generatePendingMapEntry(10, 20, "127.0.0.4")), Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(26, 30, "127.0.0.1"),
+                generatePendingMapEntry(0, 10, "127.0.0.4")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+        MOVING_NODE = 3;
+
+        moveHost(hosts.get(MOVING_NODE), 33, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), 
Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 33, tmd);
+
+        moveHost(hosts.get(MOVING_NODE), 30, tmd, valueFactory);
+
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.1")), 
Simple_RF1_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.2")), 
Simple_RF2_KeyspaceName);
+        assertPendingRanges(tmd, 
generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.3")), 
Simple_RF3_KeyspaceName);
+
+        finishMove(hosts.get(MOVING_NODE), 30, tmd);
+    }
+
+    private void moveHost(InetAddress host, int token, TokenMetadata tmd, 
VersionedValue.VersionedValueFactory valueFactory )
+    {
+        StorageService.instance.onChange(host, ApplicationState.STATUS, 
valueFactory.moving(new BigIntegerToken(String.valueOf(token))));
+        PendingRangeCalculatorService.instance.blockUntilFinished();
+        assertTrue(tmd.isMoving(host));
+    }
+
+    private void finishMove(InetAddress host, int token, TokenMetadata tmd)
+    {
+        tmd.removeFromMoving(host);
+        assertTrue(!tmd.isMoving(host));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), 
host);
+    }
+
+    private Map.Entry<Range<Token>, Collection<InetAddress>> 
generatePendingMapEntry(int start, int end, String... endpoints) throws 
UnknownHostException
+    {
+        Map<Range<Token>, Collection<InetAddress>> pendingRanges = new 
HashMap<>();
+        pendingRanges.put(generateRange(start, end), makeAddrs(endpoints));
+        return pendingRanges.entrySet().iterator().next();
+    }
+
+    private Map<Range<Token>, Collection<InetAddress>> 
generatePendingRanges(Map.Entry<Range<Token>, Collection<InetAddress>>... 
entries)
+    {
+        Map<Range<Token>, Collection<InetAddress>> pendingRanges = new 
HashMap<>();
+        for(Map.Entry<Range<Token>, Collection<InetAddress>> entry : entries)
+        {
+            pendingRanges.put(entry.getKey(), entry.getValue());
+        }
+        return pendingRanges;
+    }
+
+    private void assertPendingRanges(TokenMetadata tmd, Map<Range<Token>,  
Collection<InetAddress>> pendingRanges, String keyspaceName) throws 
ConfigurationException
+    {
+        boolean keyspaceFound = false;
+        for (String nonSystemKeyspaceName : 
Schema.instance.getNonSystemKeyspaces())
+        {
+            if(!keyspaceName.equals(nonSystemKeyspaceName))
+                continue;
+            assertMaps(pendingRanges, tmd.getPendingRanges(keyspaceName));
+            keyspaceFound = true;
+        }
+
+        assert keyspaceFound;
+    }
+
+    private void assertMaps(Map<Range<Token>,  Collection<InetAddress>> 
expected, Map<Range<Token>,  Collection<InetAddress>> actual)
+    {
+        assertEquals(expected.size(), actual.size());
+        for(Map.Entry<Range<Token>,  Collection<InetAddress>> expectedEntry : 
expected.entrySet())
+        {
+            assertNotNull(actual.get(expectedEntry.getKey()));
+            assertEquals(new ArrayList<>(expectedEntry.getValue()), new 
ArrayList<>(actual.get(expectedEntry.getKey())));
+        }
+    }
+
     /*
      * Test whether write endpoints is correct when the node is moving. Uses
      * StorageService.onChange and does not manipulate token metadata directly.
@@ -116,6 +549,8 @@ public class MoveTest
         for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
         {
             strategy = getStrategy(keyspaceName, tmd);
+            if(strategy instanceof NetworkTopologyStrategy)
+                continue;
             int numMoved = 0;
             for (Token token : keyTokens)
             {

Reply via email to