Fix streaming too much data during move operations
patch by Fabien Rousseau; reviewed by Paul Cannon for CASSANDRA-3639


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a277fbed
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a277fbed
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a277fbed

Branch: refs/heads/cassandra-1.1
Commit: a277fbedb9ac5d109527848a77f470f22ea1ff00
Parents: 9a336a1
Author: paul cannon <p...@datastax.com>
Authored: Tue Jan 31 13:44:37 2012 -0600
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Tue Feb 7 16:55:47 2012 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/dht/Range.java       |    6 +
 .../apache/cassandra/service/StorageService.java   |   61 ++--
 .../locator/OldNetworkTopologyStrategyTest.java    |  218 ++++++++++++++-
 4 files changed, 250 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a277fbed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 03f3fca..9768867 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.1-dev
+ * Fix streaming too much data during move operations (CASSANDRA-3639)
  * Nodetool and CLI connect to localhost by default (CASSANDRA-3568)
  * Reduce memory used by primary index sample (CASSANDRA-3743)
  * (Hadoop) separate input/output configurations (CASSANDRA-3197, 3765)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a277fbed/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java 
b/src/java/org/apache/cassandra/dht/Range.java
index 13335d1..4478214 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -264,6 +264,12 @@ public class Range<T extends RingPosition> extends 
AbstractBounds<T> implements
         return difference;
     }
 
+    public Set<Range<T>> subtract(Range<T> rhs)
+    {
+        return rhs.differenceToFetch(this);
+    }
+
+
     /**
      * Calculate set of the difference ranges of given two ranges
      * (as current (A, B] and rhs is (C, D])

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a277fbed/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 98c3e7f..fdb8d4a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2786,12 +2786,6 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         return latch;
     }
 
-    // see calculateStreamAndFetchRanges(Iterator, Iterator) for description
-    private Pair<Set<Range<Token>>, Set<Range<Token>>> 
calculateStreamAndFetchRanges(Collection<Range<Token>> current, 
Collection<Range<Token>> updated)
-    {
-        return calculateStreamAndFetchRanges(current.iterator(), 
updated.iterator());
-    }
-
     /**
      * Calculate pair of ranges to stream/fetch for given two range collections
      * (current ranges for table and ranges after move to new token)
@@ -2800,42 +2794,47 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
      * @param updated collection of the ranges after token is changed
      * @return pair of ranges to stream/fetch for given current and updated 
range collections
      */
-    private Pair<Set<Range<Token>>, Set<Range<Token>>> 
calculateStreamAndFetchRanges(Iterator<Range<Token>> current, 
Iterator<Range<Token>> updated)
+    public Pair<Set<Range<Token>>, Set<Range<Token>>> 
calculateStreamAndFetchRanges(Collection<Range<Token>> current, 
Collection<Range<Token>> updated)
     {
         Set<Range<Token>> toStream = new HashSet<Range<Token>>();
         Set<Range<Token>> toFetch  = new HashSet<Range<Token>>();
 
-        while (current.hasNext() && updated.hasNext())
-        {
-            Range<Token> r1 = current.next();
-            Range<Token> r2 = updated.next();
 
-            // if ranges intersect we need to fetch only missing part
-            if (r1.intersects(r2))
+        for (Range r1 : current)
+        {
+            boolean intersect = false;
+            for (Range r2 : updated)
             {
-                // adding difference ranges to fetch from a ring
-                toFetch.addAll(r1.differenceToFetch(r2));
-
-                // if current range is a sub-range of a new range we don't 
need to seed
-                // otherwise we need to seed parts of the current range
-                if (!r2.contains(r1))
+                if (r1.intersects(r2))
                 {
-                    // (A, B] & (C, D]
-                    if (r1.left.compareTo(r2.left) < 0) // if A < C
-                    {
-                        toStream.add(new Range<Token>(r1.left, r2.left)); // 
seed (A, C]
-                    }
-
-                    if (r1.right.compareTo(r2.right) > 0) // if B > D
-                    {
-                        toStream.add(new Range<Token>(r2.right, r1.right)); // 
seed (D, B]
-                    }
+                    // adding difference ranges to fetch from a ring
+                    toStream.addAll(r1.subtract(r2));
+                    intersect = true;
+                    break;
                 }
             }
-            else // otherwise we need to fetch whole new range
+            if (!intersect)
             {
                 toStream.add(r1); // should seed whole old range
-                toFetch.add(r2);
+            }
+        }
+
+        for (Range r2 : updated)
+        {
+            boolean intersect = false;
+            for (Range r1 : current)
+            {
+                if (r2.intersects(r1))
+                {
+                    // adding difference ranges to fetch from a ring
+                    toFetch.addAll(r2.subtract(r1));
+                    intersect = true;
+                    break;
+                }
+            }
+            if (!intersect)
+            {
+                toFetch.add(r2); // should fetch whole old range
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a277fbed/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java 
b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
index ab1a52b..a11a128 100644
--- a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java
@@ -19,22 +19,27 @@
 
 package org.apache.cassandra.locator;
 
+import static org.junit.Assert.assertEquals;
+
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
+import java.util.Set;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.dht.BigIntegerToken;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Pair;
+import org.junit.Before;
+import org.junit.Test;
 
 public class OldNetworkTopologyStrategyTest extends SchemaLoader
 {
@@ -162,4 +167,207 @@ public class OldNetworkTopologyStrategyTest extends 
SchemaLoader
         }
     }
 
+    /**
+     * test basic methods to move a node. For sure, it's not the best place, 
but it's easy to test
+     *
+     * @throws UnknownHostException
+     */
+    @Test
+    public void testMoveLeft() throws UnknownHostException
+    {
+        // Moves to the left : nothing to fetch, last part to stream
+
+        int movingNodeIdx = 1;
+        BigIntegerToken newToken = new 
BigIntegerToken("21267647932558653966460912964485513216");
+        BigIntegerToken[] tokens = initTokens();
+        BigIntegerToken[] tokensAfterMove = initTokensAfterMove(tokens, 
movingNodeIdx, newToken);
+        Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = 
calculateStreamAndFetchRanges(tokens, tokensAfterMove, movingNodeIdx, newToken);
+
+        assertEquals(ranges.left.iterator().next().left, 
tokensAfterMove[movingNodeIdx]);
+        assertEquals(ranges.left.iterator().next().right, 
tokens[movingNodeIdx]);
+        assertEquals("No data should be fetched", ranges.right.size(), 0);
+
+    }
+
+    @Test
+    public void testMoveRight() throws UnknownHostException
+    {
+        // Moves to the right : last part to fetch, nothing to stream
+
+        int movingNodeIdx = 1;
+        BigIntegerToken newToken = new 
BigIntegerToken("35267647932558653966460912964485513216");
+        BigIntegerToken[] tokens = initTokens();
+        BigIntegerToken[] tokensAfterMove = initTokensAfterMove(tokens, 
movingNodeIdx, newToken);
+        Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = 
calculateStreamAndFetchRanges(tokens, tokensAfterMove, movingNodeIdx, newToken);
+
+        assertEquals("No data should be streamed", ranges.left.size(), 0);
+        assertEquals(ranges.right.iterator().next().left, 
tokens[movingNodeIdx]);
+        assertEquals(ranges.right.iterator().next().right, 
tokensAfterMove[movingNodeIdx]);
+
+    }
+
+    @Test
+    public void testMoveMiddleOfRing() throws UnknownHostException
+    {
+        // moves to another position in the middle of the ring : should stream 
all its data, and fetch all its new data
+
+        int movingNodeIdx = 1;
+        int movingNodeIdxAfterMove = 4;
+        BigIntegerToken newToken = new 
BigIntegerToken("90070591730234615865843651857942052864");
+        BigIntegerToken[] tokens = initTokens();
+        BigIntegerToken[] tokensAfterMove = initTokensAfterMove(tokens, 
movingNodeIdx, newToken);
+        Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = 
calculateStreamAndFetchRanges(tokens, tokensAfterMove, movingNodeIdx, newToken);
+
+        // sort the results, so they can be compared
+        Range[] toStream = ranges.left.toArray(new Range[0]);
+        Range[] toFetch = ranges.right.toArray(new Range[0]);
+        Arrays.sort(toStream);
+        Arrays.sort(toFetch);
+
+        // build expected ranges
+        Range[] toStreamExpected = new Range[2];
+        toStreamExpected[0] = new Range(getToken(movingNodeIdx - 2, tokens), 
getToken(movingNodeIdx - 1, tokens));
+        toStreamExpected[1] = new Range(getToken(movingNodeIdx - 1, tokens), 
getToken(movingNodeIdx, tokens));
+        Arrays.sort(toStreamExpected);
+        Range[] toFetchExpected = new Range[2];
+        toFetchExpected[0] = new Range(getToken(movingNodeIdxAfterMove - 1, 
tokens), getToken(movingNodeIdxAfterMove, tokens));
+        toFetchExpected[1] = new Range(getToken(movingNodeIdxAfterMove, 
tokensAfterMove), getToken(movingNodeIdx, tokensAfterMove));
+        Arrays.sort(toFetchExpected);
+
+        assertEquals(Arrays.equals(toStream, toStreamExpected), true);
+        assertEquals(Arrays.equals(toFetch, toFetchExpected), true);
+    }
+
+    @Test
+    public void testMoveAfterNextNeighbors() throws UnknownHostException
+    {
+        // moves after its next neighbor in the ring
+
+        int movingNodeIdx = 1;
+        int movingNodeIdxAfterMove = 2;
+        BigIntegerToken newToken = new 
BigIntegerToken("52535295865117307932921825928971026432");
+        BigIntegerToken[] tokens = initTokens();
+        BigIntegerToken[] tokensAfterMove = initTokensAfterMove(tokens, 
movingNodeIdx, newToken);
+        Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = 
calculateStreamAndFetchRanges(tokens, tokensAfterMove, movingNodeIdx, newToken);
+
+
+        // sort the results, so they can be compared
+        Range[] toStream = ranges.left.toArray(new Range[0]);
+        Range[] toFetch = ranges.right.toArray(new Range[0]);
+        Arrays.sort(toStream);
+        Arrays.sort(toFetch);
+
+        // build expected ranges
+        Range[] toStreamExpected = new Range[1];
+        toStreamExpected[0] = new Range(getToken(movingNodeIdx - 2, tokens), 
getToken(movingNodeIdx - 1, tokens));
+        Arrays.sort(toStreamExpected);
+        Range[] toFetchExpected = new Range[2];
+        toFetchExpected[0] = new Range(getToken(movingNodeIdxAfterMove - 1, 
tokens), getToken(movingNodeIdxAfterMove, tokens));
+        toFetchExpected[1] = new Range(getToken(movingNodeIdxAfterMove, 
tokensAfterMove), getToken(movingNodeIdx, tokensAfterMove));
+        Arrays.sort(toFetchExpected);
+
+        assertEquals(Arrays.equals(toStream, toStreamExpected), true);
+        assertEquals(Arrays.equals(toFetch, toFetchExpected), true);
+    }
+
+    @Test
+    public void testMoveBeforePreviousNeighbor() throws UnknownHostException
+    {
+        // moves before its previous neighbor in the ring
+
+        int movingNodeIdx = 1;
+        int movingNodeIdxAfterMove = 7;
+        BigIntegerToken newToken = new 
BigIntegerToken("158873535527910577765226390751398592512");
+        BigIntegerToken[] tokens = initTokens();
+        BigIntegerToken[] tokensAfterMove = initTokensAfterMove(tokens, 
movingNodeIdx, newToken);
+        Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = 
calculateStreamAndFetchRanges(tokens, tokensAfterMove, movingNodeIdx, newToken);
+
+        Range[] toStream = ranges.left.toArray(new Range[0]);
+        Range[] toFetch = ranges.right.toArray(new Range[0]);
+        Arrays.sort(toStream);
+        Arrays.sort(toFetch);
+
+        Range[] toStreamExpected = new Range[2];
+        toStreamExpected[0] = new Range(getToken(movingNodeIdx, 
tokensAfterMove), getToken(movingNodeIdx - 1, tokensAfterMove));
+        toStreamExpected[1] = new Range(getToken(movingNodeIdx - 1, tokens), 
getToken(movingNodeIdx, tokens));
+        Arrays.sort(toStreamExpected);
+        Range[] toFetchExpected = new Range[1];
+        toFetchExpected[0] = new Range(getToken(movingNodeIdxAfterMove - 1, 
tokens), getToken(movingNodeIdxAfterMove, tokens));
+        Arrays.sort(toFetchExpected);
+
+        System.out.println("toStream : " + Arrays.toString(toStream));
+        System.out.println("toFetch : " + Arrays.toString(toFetch));
+        System.out.println("toStreamExpected : " + 
Arrays.toString(toStreamExpected));
+        System.out.println("toFetchExpected : " + 
Arrays.toString(toFetchExpected));
+
+        assertEquals(Arrays.equals(toStream, toStreamExpected), true);
+        assertEquals(Arrays.equals(toFetch, toFetchExpected), true);
+    }
+
+    private BigIntegerToken[] initTokensAfterMove(BigIntegerToken[] tokens,
+            int movingNodeIdx, BigIntegerToken newToken)
+    {
+        BigIntegerToken[] tokensAfterMove = tokens.clone();
+        tokensAfterMove[movingNodeIdx] = newToken;
+        return tokensAfterMove;
+    }
+
+    private BigIntegerToken[] initTokens()
+    {
+        BigIntegerToken[] tokens = new BigIntegerToken[] {
+                new BigIntegerToken("0"), // just to be able to test
+                new BigIntegerToken("34028236692093846346337460743176821145"),
+                new BigIntegerToken("42535295865117307932921825928971026432"),
+                new BigIntegerToken("63802943797675961899382738893456539648"),
+                new BigIntegerToken("85070591730234615865843651857942052864"),
+                new BigIntegerToken("106338239662793269832304564822427566080"),
+                new BigIntegerToken("127605887595351923798765477786913079296"),
+                new BigIntegerToken("148873535527910577765226390751398592512")
+        };
+        return tokens;
+    }
+
+    private TokenMetadata initTokenMetadata(BigIntegerToken[] tokens)
+            throws UnknownHostException
+    {
+        TokenMetadata tokenMetadataCurrent = new TokenMetadata();
+
+        int lastIPPart = 1;
+        for (BigIntegerToken token : tokens)
+            tokenMetadataCurrent.updateNormalToken(token, 
InetAddress.getByName("254.0.0." + Integer.toString(lastIPPart++)));
+
+        return tokenMetadataCurrent;
+    }
+
+    private BigIntegerToken getToken(int idx, BigIntegerToken[] tokens)
+    {
+        if (idx >= tokens.length)
+            idx = idx % tokens.length;
+        while (idx < 0)
+            idx += tokens.length;
+
+        return tokens[idx];
+
+    }
+
+    private Pair<Set<Range<Token>>, Set<Range<Token>>> 
calculateStreamAndFetchRanges(BigIntegerToken[] tokens, BigIntegerToken[] 
tokensAfterMove, int movingNodeIdx, BigIntegerToken newToken) throws 
UnknownHostException
+    {
+        RackInferringSnitch endpointSnitch = new RackInferringSnitch();
+
+        InetAddress movingNode = InetAddress.getByName("254.0.0." + 
Integer.toString(movingNodeIdx + 1));
+
+
+        TokenMetadata tokenMetadataCurrent = initTokenMetadata(tokens);
+        TokenMetadata tokenMetadataAfterMove = 
initTokenMetadata(tokensAfterMove);
+        AbstractReplicationStrategy strategy = new 
OldNetworkTopologyStrategy("Keyspace1", tokenMetadataCurrent, endpointSnitch, 
KSMetaData.optsWithRF(2));
+
+        Collection<Range<Token>> currentRanges = 
strategy.getAddressRanges().get(movingNode);
+        Collection<Range<Token>> updatedRanges = 
strategy.getPendingAddressRanges(tokenMetadataAfterMove, 
tokensAfterMove[movingNodeIdx], movingNode);
+
+        Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = 
StorageService.instance.calculateStreamAndFetchRanges(currentRanges, 
updatedRanges);
+
+        return ranges;
+    }
+
+
 }

Reply via email to