Fix streaming too much data during move operations patch by Fabien Rousseau; reviewed by Paul Cannon for CASSANDRA-3639
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a277fbed Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a277fbed Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a277fbed Branch: refs/heads/cassandra-1.1 Commit: a277fbedb9ac5d109527848a77f470f22ea1ff00 Parents: 9a336a1 Author: paul cannon <p...@datastax.com> Authored: Tue Jan 31 13:44:37 2012 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Feb 7 16:55:47 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/dht/Range.java | 6 + .../apache/cassandra/service/StorageService.java | 61 ++-- .../locator/OldNetworkTopologyStrategyTest.java | 218 ++++++++++++++- 4 files changed, 250 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a277fbed/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 03f3fca..9768867 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.1-dev + * Fix streaming too much data during move operations (CASSANDRA-3639) * Nodetool and CLI connect to localhost by default (CASSANDRA-3568) * Reduce memory used by primary index sample (CASSANDRA-3743) * (Hadoop) separate input/output configurations (CASSANDRA-3197, 3765) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a277fbed/src/java/org/apache/cassandra/dht/Range.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java index 13335d1..4478214 100644 --- a/src/java/org/apache/cassandra/dht/Range.java +++ b/src/java/org/apache/cassandra/dht/Range.java @@ -264,6 +264,12 @@ public class Range<T extends RingPosition> extends AbstractBounds<T> implements return difference; } + public Set<Range<T>> subtract(Range<T> rhs) + { + return rhs.differenceToFetch(this); + } + + /** * Calculate set of the difference ranges of given two ranges * (as current (A, B] and rhs is (C, D]) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a277fbed/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 98c3e7f..fdb8d4a 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2786,12 +2786,6 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe return latch; } - // see calculateStreamAndFetchRanges(Iterator, Iterator) for description - private Pair<Set<Range<Token>>, Set<Range<Token>>> calculateStreamAndFetchRanges(Collection<Range<Token>> current, Collection<Range<Token>> updated) - { - return calculateStreamAndFetchRanges(current.iterator(), updated.iterator()); - } - /** * Calculate pair of ranges to stream/fetch for given two range collections * (current ranges for table and ranges after move to new token) @@ -2800,42 +2794,47 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe * @param updated collection of the ranges after token is changed * @return pair of ranges to stream/fetch for given current and updated range collections */ - private Pair<Set<Range<Token>>, Set<Range<Token>>> calculateStreamAndFetchRanges(Iterator<Range<Token>> current, Iterator<Range<Token>> updated) + public Pair<Set<Range<Token>>, Set<Range<Token>>> calculateStreamAndFetchRanges(Collection<Range<Token>> current, Collection<Range<Token>> updated) { Set<Range<Token>> toStream = new HashSet<Range<Token>>(); Set<Range<Token>> toFetch = new HashSet<Range<Token>>(); - while (current.hasNext() && updated.hasNext()) - { - Range<Token> r1 = current.next(); - Range<Token> r2 = updated.next(); - // if ranges intersect we need to fetch only missing part - if (r1.intersects(r2)) + for (Range r1 : current) + { + boolean intersect = false; + for (Range r2 : updated) { - // adding difference ranges to fetch from a ring - toFetch.addAll(r1.differenceToFetch(r2)); - - // if current range is a sub-range of a new range we don't need to seed - // otherwise we need to seed parts of the current range - if (!r2.contains(r1)) + if (r1.intersects(r2)) { - // (A, B] & (C, D] - if (r1.left.compareTo(r2.left) < 0) // if A < C - { - toStream.add(new Range<Token>(r1.left, r2.left)); // seed (A, C] - } - - if (r1.right.compareTo(r2.right) > 0) // if B > D - { - toStream.add(new Range<Token>(r2.right, r1.right)); // seed (D, B] - } + // adding difference ranges to fetch from a ring + toStream.addAll(r1.subtract(r2)); + intersect = true; + break; } } - else // otherwise we need to fetch whole new range + if (!intersect) { toStream.add(r1); // should seed whole old range - toFetch.add(r2); + } + } + + for (Range r2 : updated) + { + boolean intersect = false; + for (Range r1 : current) + { + if (r2.intersects(r1)) + { + // adding difference ranges to fetch from a ring + toFetch.addAll(r2.subtract(r1)); + intersect = true; + break; + } + } + if (!intersect) + { + toFetch.add(r2); // should fetch whole old range } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a277fbed/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java index ab1a52b..a11a128 100644 --- a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java @@ -19,22 +19,27 @@ package org.apache.cassandra.locator; +import static org.junit.Assert.assertEquals; + import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; - -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; +import java.util.Set; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.dht.BigIntegerToken; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.Pair; +import org.junit.Before; +import org.junit.Test; public class OldNetworkTopologyStrategyTest extends SchemaLoader { @@ -162,4 +167,207 @@ public class OldNetworkTopologyStrategyTest extends SchemaLoader } } + /** + * test basic methods to move a node. For sure, it's not the best place, but it's easy to test + * + * @throws UnknownHostException + */ + @Test + public void testMoveLeft() throws UnknownHostException + { + // Moves to the left : nothing to fetch, last part to stream + + int movingNodeIdx = 1; + BigIntegerToken newToken = new BigIntegerToken("21267647932558653966460912964485513216"); + BigIntegerToken[] tokens = initTokens(); + BigIntegerToken[] tokensAfterMove = initTokensAfterMove(tokens, movingNodeIdx, newToken); + Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = calculateStreamAndFetchRanges(tokens, tokensAfterMove, movingNodeIdx, newToken); + + assertEquals(ranges.left.iterator().next().left, tokensAfterMove[movingNodeIdx]); + assertEquals(ranges.left.iterator().next().right, tokens[movingNodeIdx]); + assertEquals("No data should be fetched", ranges.right.size(), 0); + + } + + @Test + public void testMoveRight() throws UnknownHostException + { + // Moves to the right : last part to fetch, nothing to stream + + int movingNodeIdx = 1; + BigIntegerToken newToken = new BigIntegerToken("35267647932558653966460912964485513216"); + BigIntegerToken[] tokens = initTokens(); + BigIntegerToken[] tokensAfterMove = initTokensAfterMove(tokens, movingNodeIdx, newToken); + Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = calculateStreamAndFetchRanges(tokens, tokensAfterMove, movingNodeIdx, newToken); + + assertEquals("No data should be streamed", ranges.left.size(), 0); + assertEquals(ranges.right.iterator().next().left, tokens[movingNodeIdx]); + assertEquals(ranges.right.iterator().next().right, tokensAfterMove[movingNodeIdx]); + + } + + @Test + public void testMoveMiddleOfRing() throws UnknownHostException + { + // moves to another position in the middle of the ring : should stream all its data, and fetch all its new data + + int movingNodeIdx = 1; + int movingNodeIdxAfterMove = 4; + BigIntegerToken newToken = new BigIntegerToken("90070591730234615865843651857942052864"); + BigIntegerToken[] tokens = initTokens(); + BigIntegerToken[] tokensAfterMove = initTokensAfterMove(tokens, movingNodeIdx, newToken); + Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = calculateStreamAndFetchRanges(tokens, tokensAfterMove, movingNodeIdx, newToken); + + // sort the results, so they can be compared + Range[] toStream = ranges.left.toArray(new Range[0]); + Range[] toFetch = ranges.right.toArray(new Range[0]); + Arrays.sort(toStream); + Arrays.sort(toFetch); + + // build expected ranges + Range[] toStreamExpected = new Range[2]; + toStreamExpected[0] = new Range(getToken(movingNodeIdx - 2, tokens), getToken(movingNodeIdx - 1, tokens)); + toStreamExpected[1] = new Range(getToken(movingNodeIdx - 1, tokens), getToken(movingNodeIdx, tokens)); + Arrays.sort(toStreamExpected); + Range[] toFetchExpected = new Range[2]; + toFetchExpected[0] = new Range(getToken(movingNodeIdxAfterMove - 1, tokens), getToken(movingNodeIdxAfterMove, tokens)); + toFetchExpected[1] = new Range(getToken(movingNodeIdxAfterMove, tokensAfterMove), getToken(movingNodeIdx, tokensAfterMove)); + Arrays.sort(toFetchExpected); + + assertEquals(Arrays.equals(toStream, toStreamExpected), true); + assertEquals(Arrays.equals(toFetch, toFetchExpected), true); + } + + @Test + public void testMoveAfterNextNeighbors() throws UnknownHostException + { + // moves after its next neighbor in the ring + + int movingNodeIdx = 1; + int movingNodeIdxAfterMove = 2; + BigIntegerToken newToken = new BigIntegerToken("52535295865117307932921825928971026432"); + BigIntegerToken[] tokens = initTokens(); + BigIntegerToken[] tokensAfterMove = initTokensAfterMove(tokens, movingNodeIdx, newToken); + Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = calculateStreamAndFetchRanges(tokens, tokensAfterMove, movingNodeIdx, newToken); + + + // sort the results, so they can be compared + Range[] toStream = ranges.left.toArray(new Range[0]); + Range[] toFetch = ranges.right.toArray(new Range[0]); + Arrays.sort(toStream); + Arrays.sort(toFetch); + + // build expected ranges + Range[] toStreamExpected = new Range[1]; + toStreamExpected[0] = new Range(getToken(movingNodeIdx - 2, tokens), getToken(movingNodeIdx - 1, tokens)); + Arrays.sort(toStreamExpected); + Range[] toFetchExpected = new Range[2]; + toFetchExpected[0] = new Range(getToken(movingNodeIdxAfterMove - 1, tokens), getToken(movingNodeIdxAfterMove, tokens)); + toFetchExpected[1] = new Range(getToken(movingNodeIdxAfterMove, tokensAfterMove), getToken(movingNodeIdx, tokensAfterMove)); + Arrays.sort(toFetchExpected); + + assertEquals(Arrays.equals(toStream, toStreamExpected), true); + assertEquals(Arrays.equals(toFetch, toFetchExpected), true); + } + + @Test + public void testMoveBeforePreviousNeighbor() throws UnknownHostException + { + // moves before its previous neighbor in the ring + + int movingNodeIdx = 1; + int movingNodeIdxAfterMove = 7; + BigIntegerToken newToken = new BigIntegerToken("158873535527910577765226390751398592512"); + BigIntegerToken[] tokens = initTokens(); + BigIntegerToken[] tokensAfterMove = initTokensAfterMove(tokens, movingNodeIdx, newToken); + Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = calculateStreamAndFetchRanges(tokens, tokensAfterMove, movingNodeIdx, newToken); + + Range[] toStream = ranges.left.toArray(new Range[0]); + Range[] toFetch = ranges.right.toArray(new Range[0]); + Arrays.sort(toStream); + Arrays.sort(toFetch); + + Range[] toStreamExpected = new Range[2]; + toStreamExpected[0] = new Range(getToken(movingNodeIdx, tokensAfterMove), getToken(movingNodeIdx - 1, tokensAfterMove)); + toStreamExpected[1] = new Range(getToken(movingNodeIdx - 1, tokens), getToken(movingNodeIdx, tokens)); + Arrays.sort(toStreamExpected); + Range[] toFetchExpected = new Range[1]; + toFetchExpected[0] = new Range(getToken(movingNodeIdxAfterMove - 1, tokens), getToken(movingNodeIdxAfterMove, tokens)); + Arrays.sort(toFetchExpected); + + System.out.println("toStream : " + Arrays.toString(toStream)); + System.out.println("toFetch : " + Arrays.toString(toFetch)); + System.out.println("toStreamExpected : " + Arrays.toString(toStreamExpected)); + System.out.println("toFetchExpected : " + Arrays.toString(toFetchExpected)); + + assertEquals(Arrays.equals(toStream, toStreamExpected), true); + assertEquals(Arrays.equals(toFetch, toFetchExpected), true); + } + + private BigIntegerToken[] initTokensAfterMove(BigIntegerToken[] tokens, + int movingNodeIdx, BigIntegerToken newToken) + { + BigIntegerToken[] tokensAfterMove = tokens.clone(); + tokensAfterMove[movingNodeIdx] = newToken; + return tokensAfterMove; + } + + private BigIntegerToken[] initTokens() + { + BigIntegerToken[] tokens = new BigIntegerToken[] { + new BigIntegerToken("0"), // just to be able to test + new BigIntegerToken("34028236692093846346337460743176821145"), + new BigIntegerToken("42535295865117307932921825928971026432"), + new BigIntegerToken("63802943797675961899382738893456539648"), + new BigIntegerToken("85070591730234615865843651857942052864"), + new BigIntegerToken("106338239662793269832304564822427566080"), + new BigIntegerToken("127605887595351923798765477786913079296"), + new BigIntegerToken("148873535527910577765226390751398592512") + }; + return tokens; + } + + private TokenMetadata initTokenMetadata(BigIntegerToken[] tokens) + throws UnknownHostException + { + TokenMetadata tokenMetadataCurrent = new TokenMetadata(); + + int lastIPPart = 1; + for (BigIntegerToken token : tokens) + tokenMetadataCurrent.updateNormalToken(token, InetAddress.getByName("254.0.0." + Integer.toString(lastIPPart++))); + + return tokenMetadataCurrent; + } + + private BigIntegerToken getToken(int idx, BigIntegerToken[] tokens) + { + if (idx >= tokens.length) + idx = idx % tokens.length; + while (idx < 0) + idx += tokens.length; + + return tokens[idx]; + + } + + private Pair<Set<Range<Token>>, Set<Range<Token>>> calculateStreamAndFetchRanges(BigIntegerToken[] tokens, BigIntegerToken[] tokensAfterMove, int movingNodeIdx, BigIntegerToken newToken) throws UnknownHostException + { + RackInferringSnitch endpointSnitch = new RackInferringSnitch(); + + InetAddress movingNode = InetAddress.getByName("254.0.0." + Integer.toString(movingNodeIdx + 1)); + + + TokenMetadata tokenMetadataCurrent = initTokenMetadata(tokens); + TokenMetadata tokenMetadataAfterMove = initTokenMetadata(tokensAfterMove); + AbstractReplicationStrategy strategy = new OldNetworkTopologyStrategy("Keyspace1", tokenMetadataCurrent, endpointSnitch, KSMetaData.optsWithRF(2)); + + Collection<Range<Token>> currentRanges = strategy.getAddressRanges().get(movingNode); + Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetadataAfterMove, tokensAfterMove[movingNodeIdx], movingNode); + + Pair<Set<Range<Token>>, Set<Range<Token>>> ranges = StorageService.instance.calculateStreamAndFetchRanges(currentRanges, updatedRanges); + + return ranges; + } + + }