Repository: cassandra Updated Branches: refs/heads/trunk 210da3dc0 -> 0379201c7
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/dht/BootStrapperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index 8ae6853..2f412ad 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -105,7 +105,6 @@ public class BootStrapperTest InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1"); assertEquals(numOldNodes, tmd.sortedTokens().size()); - RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false, 1); IFailureDetector mockFailureDetector = new IFailureDetector() { public boolean isAlive(InetAddressAndPort ep) @@ -120,26 +119,20 @@ public class BootStrapperTest public void remove(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } }; - s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector)); + RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), mockFailureDetector, false, 1); assertNotNull(Keyspace.open(keyspaceName)); s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint)); - Collection<Multimap<InetAddressAndPort, FetchReplica>> toFetch = s.toFetch().get(keyspaceName); + Multimap<InetAddressAndPort, FetchReplica> toFetch = s.toFetch().get(keyspaceName); // Check we get get RF new ranges in total - long rangesCount = toFetch.stream() - .map(Multimap::values) - .flatMap(Collection::stream) - .map(f -> f.remote) - .map(Replica::range) - .count(); - assertEquals(replicationFactor, rangesCount); + assertEquals(replicationFactor, toFetch.size()); // there isn't any point in testing the size of these collections for any specific size. When a random partitioner // is used, they will vary. - assert toFetch.stream().map(Multimap::values).flatMap(Collection::stream).count() > 0; - assert toFetch.stream().map(Multimap::keySet).map(Collection::stream).noneMatch(myEndpoint::equals); + assert toFetch.values().size() > 0; + assert toFetch.keys().stream().noneMatch(myEndpoint::equals); return s; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java index 07d6377..cee4bb9 100644 --- a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java +++ b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java @@ -195,18 +195,26 @@ public class RangeFetchMapCalculatorTest addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3"); //Return false for all except 127.0.0.5 - final Predicate<Replica> filter = replica -> + final RangeStreamer.SourceFilter filter = new RangeStreamer.SourceFilter() { - try + public boolean apply(Replica replica) { - if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.5"))) - return false; - else + try + { + if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.5"))) + return false; + else + return true; + } + catch (UnknownHostException e) + { return true; + } } - catch (UnknownHostException e) + + public String message(Replica replica) { - return true; + return "Doesn't match 127.0.0.5"; } }; @@ -230,7 +238,18 @@ public class RangeFetchMapCalculatorTest addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3"); - final Predicate<Replica> allDeadFilter = replica -> false; + final RangeStreamer.SourceFilter allDeadFilter = new RangeStreamer.SourceFilter() + { + public boolean apply(Replica replica) + { + return false; + } + + public String message(Replica replica) + { + return "All dead"; + } + }; RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(allDeadFilter), "Test"); calculator.getRangeFetchMap(); @@ -263,18 +282,26 @@ public class RangeFetchMapCalculatorTest addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59"); //Reject only 127.0.0.3 and accept everyone else - final Predicate<Replica> localHostFilter = replica -> + final RangeStreamer.SourceFilter localHostFilter = new RangeStreamer.SourceFilter() { - try + public boolean apply(Replica replica) { - if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3"))) - return false; - else + try + { + if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3"))) + return false; + else + return true; + } + catch (UnknownHostException e) + { return true; + } } - catch (UnknownHostException e) + + public String message(Replica replica) { - return true; + return "Not 127.0.0.3"; } }; @@ -318,18 +345,26 @@ public class RangeFetchMapCalculatorTest // and a trivial one: addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3"); - Predicate<Replica> filter = replica -> + RangeStreamer.SourceFilter filter = new RangeStreamer.SourceFilter() { - try + public boolean apply(Replica replica) { - if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3"))) - return false; + try + { + if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3"))) + return false; + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + return true; } - catch (UnknownHostException e) + + public String message(Replica replica) { - throw new RuntimeException(e); + return "Not 127.0.0.3"; } - return true; }; RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.singleton(filter), "Test"); Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java index 4afeb5a..23d585f 100644 --- a/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/OldNetworkTopologyStrategyTest.java @@ -34,6 +34,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.RangeRelocator; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.Pair; @@ -368,7 +369,7 @@ public class OldNetworkTopologyStrategyTest RangesAtEndpoint currentRanges = strategy.getAddressReplicas().get(movingNode); RangesAtEndpoint updatedRanges = strategy.getPendingAddressRanges(tokenMetadataAfterMove, tokensAfterMove[movingNodeIdx], movingNode); - return asRanges(StorageService.calculateStreamAndFetchRanges(currentRanges, updatedRanges)); + return asRanges(RangeRelocator.calculateStreamAndFetchRanges(currentRanges, updatedRanges)); } private static Map<String, String> optsWithRF(int rf) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java index 63973ea..0ee1f81 100644 --- a/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java +++ b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.service; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -47,6 +48,7 @@ import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaCollection; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.locator.Replica.fullReplica; @@ -60,29 +62,59 @@ import static org.apache.cassandra.service.StorageServiceTest.assertMultimapEqua */ public class BootstrapTransientTest { - static InetAddressAndPort aAddress; - static InetAddressAndPort bAddress; - static InetAddressAndPort cAddress; - static InetAddressAndPort dAddress; + static InetAddressAndPort address02; + static InetAddressAndPort address03; + static InetAddressAndPort address04; + static InetAddressAndPort address05; @BeforeClass public static void setUpClass() throws Exception { - aAddress = InetAddressAndPort.getByName("127.0.0.1"); - bAddress = InetAddressAndPort.getByName("127.0.0.2"); - cAddress = InetAddressAndPort.getByName("127.0.0.3"); - dAddress = InetAddressAndPort.getByName("127.0.0.4"); + address02 = InetAddressAndPort.getByName("127.0.0.2"); + address03 = InetAddressAndPort.getByName("127.0.0.3"); + address04 = InetAddressAndPort.getByName("127.0.0.4"); + address05 = InetAddressAndPort.getByName("127.0.0.5"); } private final List<InetAddressAndPort> downNodes = new ArrayList<>(); - Predicate<Replica> alivePredicate = replica -> !downNodes.contains(replica.endpoint()); + + final RangeStreamer.SourceFilter alivePredicate = new RangeStreamer.SourceFilter() + { + public boolean apply(Replica replica) + { + return !downNodes.contains(replica.endpoint()); + } + + public String message(Replica replica) + { + return "Down nodes: " + downNodes; + } + }; + + final RangeStreamer.SourceFilter sourceFilterDownNodesPredicate = new RangeStreamer.SourceFilter() + { + public boolean apply(Replica replica) + { + return !sourceFilterDownNodes.contains(replica.endpoint()); + } + + public String message(Replica replica) + { + return "Source filter down nodes" + sourceFilterDownNodes; + } + }; private final List<InetAddressAndPort> sourceFilterDownNodes = new ArrayList<>(); - private final Collection<Predicate<Replica>> sourceFilters = Collections.singleton(replica -> !sourceFilterDownNodes.contains(replica.endpoint())); + + private final Collection<RangeStreamer.SourceFilter> sourceFilters = Arrays.asList(alivePredicate, + sourceFilterDownNodesPredicate, + new RangeStreamer.ExcludeLocalNodeFilter() + ); @After public void clearDownNode() { + // TODO: actually use these downNodes.clear(); sourceFilterDownNodes.clear(); } @@ -93,27 +125,43 @@ public class BootstrapTransientTest DatabaseDescriptor.daemonInitialization(); } - Token tenToken = new OrderPreservingPartitioner.StringToken("00010"); + Token tenToken = new OrderPreservingPartitioner.StringToken("00010"); Token twentyToken = new OrderPreservingPartitioner.StringToken("00020"); Token thirtyToken = new OrderPreservingPartitioner.StringToken("00030"); Token fourtyToken = new OrderPreservingPartitioner.StringToken("00040"); - Range<Token> aRange = new Range<>(thirtyToken, tenToken); - Range<Token> bRange = new Range<>(tenToken, twentyToken); - Range<Token> cRange = new Range<>(twentyToken, thirtyToken); - Range<Token> dRange = new Range<>(thirtyToken, fourtyToken); + Range<Token> range30_10 = new Range<>(thirtyToken, tenToken); + Range<Token> range10_20 = new Range<>(tenToken, twentyToken); + Range<Token> range20_30 = new Range<>(twentyToken, thirtyToken); + Range<Token> range30_40 = new Range<>(thirtyToken, fourtyToken); + + RangesAtEndpoint toFetch = RangesAtEndpoint.of(new Replica(address05, range30_40, true), + new Replica(address05, range20_30, true), + new Replica(address05, range10_20, false)); - RangesAtEndpoint toFetch = RangesAtEndpoint.of(new Replica(dAddress, dRange, true), - new Replica(dAddress, cRange, true), - new Replica(dAddress, bRange, false)); + + public EndpointsForRange endpoints(Replica... replicas) + { + assert replicas.length > 0; + + Range<Token> range = replicas[0].range(); + EndpointsForRange.Builder builder = EndpointsForRange.builder(range); + for (Replica r : replicas) + { + assert r.range().equals(range); + builder.add(r); + } + + return builder.build(); + } @Test public void testRangeStreamerRangesToFetch() throws Exception { EndpointsByReplica expectedResult = new EndpointsByReplica(ImmutableMap.of( - fullReplica(dAddress, dRange), EndpointsForRange.builder(aRange).add(fullReplica(bAddress, aRange)).add(transientReplica(cAddress, aRange)).build(), - fullReplica(dAddress, cRange), EndpointsForRange.builder(cRange).add(fullReplica(cAddress, cRange)).add(transientReplica(bAddress, cRange)).build(), - transientReplica(dAddress, bRange), EndpointsForRange.builder(bRange).add(transientReplica(aAddress, bRange)).build())); + transientReplica(address05, range10_20), endpoints(transientReplica(address02, range10_20)), + fullReplica(address05, range20_30), endpoints(transientReplica(address03, range20_30), fullReplica(address04, range20_30)), + fullReplica(address05, range30_40), endpoints(transientReplica(address04, range30_10), fullReplica(address02, range30_10)))); invokeCalculateRangesToFetchWithPreferredEndpoints(toFetch, constructTMDs(), expectedResult); } @@ -121,11 +169,11 @@ public class BootstrapTransientTest private Pair<TokenMetadata, TokenMetadata> constructTMDs() { TokenMetadata tmd = new TokenMetadata(); - tmd.updateNormalToken(aRange.right, aAddress); - tmd.updateNormalToken(bRange.right, bAddress); - tmd.updateNormalToken(cRange.right, cAddress); + tmd.updateNormalToken(range30_10.right, address02); + tmd.updateNormalToken(range10_20.right, address03); + tmd.updateNormalToken(range20_30.right, address04); TokenMetadata updated = tmd.cloneOnlyTokenMap(); - updated.updateNormalToken(dRange.right, dAddress); + updated.updateNormalToken(range30_40.right, address05); return Pair.create(tmd, updated); } @@ -137,14 +185,13 @@ public class BootstrapTransientTest DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); EndpointsByReplica result = RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> replicas, - simpleStrategy(tmds.left), - toFetch, - true, - tmds.left, - tmds.right, - alivePredicate, - "OldNetworkTopologyStrategyTest", - sourceFilters); + simpleStrategy(tmds.left), + toFetch, + true, + tmds.left, + tmds.right, + "OldNetworkTopologyStrategyTest", + sourceFilters); result.asMap().forEach((replica, list) -> System.out.printf("Replica %s, sources %s%n", replica, list)); assertMultimapEqualsIgnoreOrder(expectedResult, result); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/service/MoveTransientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTransientTest.java b/test/unit/org/apache/cassandra/service/MoveTransientTest.java index 1e24735..e5a63c7 100644 --- a/test/unit/org/apache/cassandra/service/MoveTransientTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTransientTest.java @@ -20,11 +20,12 @@ package org.apache.cassandra.service; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; -import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + import org.apache.cassandra.locator.EndpointsByReplica; import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.locator.RangesByEndpoint; @@ -64,27 +65,56 @@ public class MoveTransientTest { private static final Logger logger = LoggerFactory.getLogger(MoveTransientTest.class); - static InetAddressAndPort aAddress; - static InetAddressAndPort bAddress; - static InetAddressAndPort cAddress; - static InetAddressAndPort dAddress; - static InetAddressAndPort eAddress; + static InetAddressAndPort address01; + static InetAddressAndPort address02; + static InetAddressAndPort address03; + static InetAddressAndPort address04; + static InetAddressAndPort address05; @BeforeClass public static void setUpClass() throws Exception { - aAddress = InetAddressAndPort.getByName("127.0.0.1"); - bAddress = InetAddressAndPort.getByName("127.0.0.2"); - cAddress = InetAddressAndPort.getByName("127.0.0.3"); - dAddress = InetAddressAndPort.getByName("127.0.0.4"); - eAddress = InetAddressAndPort.getByName("127.0.0.5"); + address01 = InetAddressAndPort.getByName("127.0.0.1"); + address02 = InetAddressAndPort.getByName("127.0.0.2"); + address03 = InetAddressAndPort.getByName("127.0.0.3"); + address04 = InetAddressAndPort.getByName("127.0.0.4"); + address05 = InetAddressAndPort.getByName("127.0.0.5"); } - private final List<InetAddressAndPort> downNodes = new ArrayList(); - Predicate<Replica> alivePredicate = replica -> !downNodes.contains(replica.endpoint()); + private final List<InetAddressAndPort> downNodes = new ArrayList<>(); + + final RangeStreamer.SourceFilter alivePredicate = new RangeStreamer.SourceFilter() + { + public boolean apply(Replica replica) + { + return !downNodes.contains(replica.endpoint()); + } + + public String message(Replica replica) + { + return "Down nodes: " + downNodes; + } + }; + + final RangeStreamer.SourceFilter sourceFilterDownNodesPredicate = new RangeStreamer.SourceFilter() + { + public boolean apply(Replica replica) + { + return !sourceFilterDownNodes.contains(replica.endpoint()); + } + + public String message(Replica replica) + { + return "Source filter down nodes: " + sourceFilterDownNodes; + } + }; private final List<InetAddressAndPort> sourceFilterDownNodes = new ArrayList<>(); - private final Collection<Predicate<Replica>> sourceFilters = Collections.singleton(replica -> !sourceFilterDownNodes.contains(replica.endpoint())); + + private final Collection<RangeStreamer.SourceFilter> sourceFilters = Arrays.asList(alivePredicate, + sourceFilterDownNodesPredicate, + new RangeStreamer.ExcludeLocalNodeFilter() + ); @After public void clearDownNode() @@ -99,27 +129,36 @@ public class MoveTransientTest DatabaseDescriptor.daemonInitialization(); } - Token oneToken = new RandomPartitioner.BigIntegerToken("1"); - Token twoToken = new RandomPartitioner.BigIntegerToken("2"); - Token threeToken = new RandomPartitioner.BigIntegerToken("3"); - Token fourToken = new RandomPartitioner.BigIntegerToken("4"); - Token sixToken = new RandomPartitioner.BigIntegerToken("6"); - Token sevenToken = new RandomPartitioner.BigIntegerToken("7"); - Token nineToken = new RandomPartitioner.BigIntegerToken("9"); - Token elevenToken = new RandomPartitioner.BigIntegerToken("11"); - Token fourteenToken = new RandomPartitioner.BigIntegerToken("14"); + final Token oneToken = new RandomPartitioner.BigIntegerToken("1"); + final Token twoToken = new RandomPartitioner.BigIntegerToken("2"); + final Token threeToken = new RandomPartitioner.BigIntegerToken("3"); + final Token fourToken = new RandomPartitioner.BigIntegerToken("4"); + final Token sixToken = new RandomPartitioner.BigIntegerToken("6"); + final Token sevenToken = new RandomPartitioner.BigIntegerToken("7"); + final Token nineToken = new RandomPartitioner.BigIntegerToken("9"); + final Token elevenToken = new RandomPartitioner.BigIntegerToken("11"); + final Token fourteenToken = new RandomPartitioner.BigIntegerToken("14"); - Range<Token> aRange = new Range(oneToken, threeToken); - Range<Token> bRange = new Range(threeToken, sixToken); - Range<Token> cRange = new Range(sixToken, nineToken); - Range<Token> dRange = new Range(nineToken, elevenToken); - Range<Token> eRange = new Range(elevenToken, oneToken); + final Range<Token> range_1_2 = new Range(oneToken, threeToken); + final Range<Token> range_3_6 = new Range(threeToken, sixToken); + final Range<Token> range_6_9 = new Range(sixToken, nineToken); + final Range<Token> range_9_11 = new Range(nineToken, elevenToken); + final Range<Token> range_11_1 = new Range(elevenToken, oneToken); - RangesAtEndpoint current = RangesAtEndpoint.of(new Replica(aAddress, aRange, true), - new Replica(aAddress, eRange, true), - new Replica(aAddress, dRange, false)); + final RangesAtEndpoint current = RangesAtEndpoint.of(new Replica(address01, range_1_2, true), + new Replica(address01, range_11_1, true), + new Replica(address01, range_9_11, false)); + + public Token token(String s) + { + return new RandomPartitioner.BigIntegerToken(s); + } + public Range<Token> range(String start, String end) + { + return new Range<>(token(start), token(end)); + } /** * Ring with start A 1-3 B 3-6 C 6-9 D 9-1 @@ -140,14 +179,14 @@ public class MoveTransientTest Range<Token> aPrimeRange = new Range<>(oneToken, fourToken); RangesAtEndpoint updated = RangesAtEndpoint.of( - new Replica(aAddress, aPrimeRange, true), - new Replica(aAddress, eRange, true), - new Replica(aAddress, dRange, false) + new Replica(address01, aPrimeRange, true), + new Replica(address01, range_11_1, true), + new Replica(address01, range_9_11, false) ); - Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated); + Pair<RangesAtEndpoint, RangesAtEndpoint> result = RangeRelocator.calculateStreamAndFetchRanges(current, updated); assertContentsIgnoreOrder(result.left); - assertContentsIgnoreOrder(result.right, fullReplica(aAddress, threeToken, fourToken)); + assertContentsIgnoreOrder(result.right, fullReplica(address01, threeToken, fourToken)); return result; } @@ -170,15 +209,15 @@ public class MoveTransientTest Range<Token> aPrimeRange = new Range<>(elevenToken, fourteenToken); RangesAtEndpoint updated = RangesAtEndpoint.of( - new Replica(aAddress, aPrimeRange, true), - new Replica(aAddress, dRange, true), - new Replica(aAddress, cRange, false) + new Replica(address01, aPrimeRange, true), + new Replica(address01, range_9_11, true), + new Replica(address01, range_6_9, false) ); - Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated); - assertContentsIgnoreOrder(result.left, fullReplica(aAddress, oneToken, threeToken), fullReplica(aAddress, fourteenToken, oneToken)); - assertContentsIgnoreOrder(result.right, transientReplica(aAddress, sixToken, nineToken), fullReplica(aAddress, nineToken, elevenToken)); + Pair<RangesAtEndpoint, RangesAtEndpoint> result = RangeRelocator.calculateStreamAndFetchRanges(current, updated); + assertContentsIgnoreOrder(result.left, fullReplica(address01, oneToken, threeToken), fullReplica(address01, fourteenToken, oneToken)); + assertContentsIgnoreOrder(result.right, transientReplica(address01, sixToken, nineToken), fullReplica(address01, nineToken, elevenToken)); return result; } @@ -200,16 +239,16 @@ public class MoveTransientTest Range<Token> aPrimeRange = new Range<>(oneToken, twoToken); RangesAtEndpoint updated = RangesAtEndpoint.of( - new Replica(aAddress, aPrimeRange, true), - new Replica(aAddress, eRange, true), - new Replica(aAddress, dRange, false) + new Replica(address01, aPrimeRange, true), + new Replica(address01, range_11_1, true), + new Replica(address01, range_9_11, false) ); - Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated); + Pair<RangesAtEndpoint, RangesAtEndpoint> result = RangeRelocator.calculateStreamAndFetchRanges(current, updated); //Moving backwards has no impact on any replica. We already fully replicate counter clockwise //The transient replica does transiently replicate slightly more, but that is addressed by cleanup - assertContentsIgnoreOrder(result.left, fullReplica(aAddress, twoToken, threeToken)); + assertContentsIgnoreOrder(result.left, fullReplica(address01, twoToken, threeToken)); assertContentsIgnoreOrder(result.right); return result; @@ -226,17 +265,16 @@ public class MoveTransientTest Range<Token> aPrimeRange = new Range<>(sixToken, sevenToken); Range<Token> bPrimeRange = new Range<>(oneToken, sixToken); - RangesAtEndpoint updated = RangesAtEndpoint.of( - new Replica(aAddress, aPrimeRange, true), - new Replica(aAddress, bPrimeRange, true), - new Replica(aAddress, eRange, false) + new Replica(address01, aPrimeRange, true), + new Replica(address01, bPrimeRange, true), + new Replica(address01, range_11_1, false) ); - Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated); + Pair<RangesAtEndpoint, RangesAtEndpoint> result = RangeRelocator.calculateStreamAndFetchRanges(current, updated); - assertContentsIgnoreOrder(result.left, fullReplica(aAddress, elevenToken, oneToken), transientReplica(aAddress, nineToken, elevenToken)); - assertContentsIgnoreOrder(result.right, fullReplica(aAddress, threeToken, sixToken), fullReplica(aAddress, sixToken, sevenToken)); + assertContentsIgnoreOrder(result.left, fullReplica(address01, elevenToken, oneToken), transientReplica(address01, nineToken, elevenToken)); + assertContentsIgnoreOrder(result.right, fullReplica(address01, threeToken, sixToken), fullReplica(address01, sixToken, sevenToken)); return result; } @@ -252,6 +290,37 @@ public class MoveTransientTest calculateStreamAndFetchRangesMoveForwardBetween(); } + @Test + public void testResubtract() + { + Token oneToken = new RandomPartitioner.BigIntegerToken("0001"); + Token tenToken = new RandomPartitioner.BigIntegerToken("0010"); + Token fiveToken = new RandomPartitioner.BigIntegerToken("0005"); + + Range<Token> range_1_10 = new Range<>(oneToken, tenToken); + Range<Token> range_1_5 = new Range<>(oneToken, tenToken); + Range<Token> range_5_10 = new Range<>(fiveToken, tenToken); + + RangesAtEndpoint singleRange = RangesAtEndpoint.of( + new Replica(address01, range_1_10, true) + ); + + RangesAtEndpoint splitRanges = RangesAtEndpoint.of( + new Replica(address01, range_1_5, true), + new Replica(address01, range_5_10, true) + ); + + // forward + Pair<RangesAtEndpoint, RangesAtEndpoint> calculated = RangeRelocator.calculateStreamAndFetchRanges(singleRange, splitRanges); + assertTrue(calculated.left.toString(), calculated.left.isEmpty()); + assertTrue(calculated.right.toString(), calculated.right.isEmpty()); + + // backward + calculated = RangeRelocator.calculateStreamAndFetchRanges(splitRanges, singleRange); + assertTrue(calculated.left.toString(), calculated.left.isEmpty()); + assertTrue(calculated.right.toString(), calculated.right.isEmpty()); + } + /** * Construct the ring state for calculateStreamAndFetchRangesMoveBackwardBetween * Where are A moves from 3 to 14 @@ -260,12 +329,12 @@ public class MoveTransientTest private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveBackwardBetween() { TokenMetadata tmd = new TokenMetadata(); - tmd.updateNormalToken(aRange.right, aAddress); - tmd.updateNormalToken(bRange.right, bAddress); - tmd.updateNormalToken(cRange.right, cAddress); - tmd.updateNormalToken(dRange.right, dAddress); - tmd.updateNormalToken(eRange.right, eAddress); - tmd.addMovingEndpoint(fourteenToken, aAddress); + tmd.updateNormalToken(range_1_2.right, address01); + tmd.updateNormalToken(range_3_6.right, address02); + tmd.updateNormalToken(range_6_9.right, address03); + tmd.updateNormalToken(range_9_11.right, address04); + tmd.updateNormalToken(range_11_1.right, address05); + tmd.addMovingEndpoint(fourteenToken, address01); TokenMetadata updated = tmd.cloneAfterAllSettled(); return Pair.create(tmd, updated); @@ -280,12 +349,12 @@ public class MoveTransientTest private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveForwardBetween() { TokenMetadata tmd = new TokenMetadata(); - tmd.updateNormalToken(aRange.right, aAddress); - tmd.updateNormalToken(bRange.right, bAddress); - tmd.updateNormalToken(cRange.right, cAddress); - tmd.updateNormalToken(dRange.right, dAddress); - tmd.updateNormalToken(eRange.right, eAddress); - tmd.addMovingEndpoint(sevenToken, aAddress); + tmd.updateNormalToken(range_1_2.right, address01); + tmd.updateNormalToken(range_3_6.right, address02); + tmd.updateNormalToken(range_6_9.right, address03); + tmd.updateNormalToken(range_9_11.right, address04); + tmd.updateNormalToken(range_11_1.right, address05); + tmd.addMovingEndpoint(sevenToken, address01); TokenMetadata updated = tmd.cloneAfterAllSettled(); return Pair.create(tmd, updated); @@ -294,12 +363,12 @@ public class MoveTransientTest private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveBackward() { TokenMetadata tmd = new TokenMetadata(); - tmd.updateNormalToken(aRange.right, aAddress); - tmd.updateNormalToken(bRange.right, bAddress); - tmd.updateNormalToken(cRange.right, cAddress); - tmd.updateNormalToken(dRange.right, dAddress); - tmd.updateNormalToken(eRange.right, eAddress); - tmd.addMovingEndpoint(twoToken, aAddress); + tmd.updateNormalToken(range_1_2.right, address01); + tmd.updateNormalToken(range_3_6.right, address02); + tmd.updateNormalToken(range_6_9.right, address03); + tmd.updateNormalToken(range_9_11.right, address04); + tmd.updateNormalToken(range_11_1.right, address05); + tmd.addMovingEndpoint(twoToken, address01); TokenMetadata updated = tmd.cloneAfterAllSettled(); return Pair.create(tmd, updated); @@ -308,12 +377,12 @@ public class MoveTransientTest private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveForward() { TokenMetadata tmd = new TokenMetadata(); - tmd.updateNormalToken(aRange.right, aAddress); - tmd.updateNormalToken(bRange.right, bAddress); - tmd.updateNormalToken(cRange.right, cAddress); - tmd.updateNormalToken(dRange.right, dAddress); - tmd.updateNormalToken(eRange.right, eAddress); - tmd.addMovingEndpoint(fourToken, aAddress); + tmd.updateNormalToken(range_1_2.right, address01); + tmd.updateNormalToken(range_3_6.right, address02); + tmd.updateNormalToken(range_6_9.right, address03); + tmd.updateNormalToken(range_9_11.right, address04); + tmd.updateNormalToken(range_11_1.right, address05); + tmd.addMovingEndpoint(fourToken, address01); TokenMetadata updated = tmd.cloneAfterAllSettled(); return Pair.create(tmd, updated); @@ -325,15 +394,15 @@ public class MoveTransientTest { EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); - InetAddressAndPort cOrB = (downNodes.contains(cAddress) || sourceFilterDownNodes.contains(cAddress)) ? bAddress : cAddress; + InetAddressAndPort cOrB = (downNodes.contains(address03) || sourceFilterDownNodes.contains(address03)) ? address02 : address03; //Need to pull the full replica and the transient replica that is losing the range - expectedResult.put(fullReplica(aAddress, sixToken, sevenToken), fullReplica(dAddress, sixToken, nineToken)); - expectedResult.put(fullReplica(aAddress, sixToken, sevenToken), transientReplica(eAddress, sixToken, nineToken)); + expectedResult.put(fullReplica(address01, sixToken, sevenToken), fullReplica(address04, sixToken, nineToken)); + expectedResult.put(fullReplica(address01, sixToken, sevenToken), transientReplica(address05, sixToken, nineToken)); //Same need both here as well - expectedResult.put(fullReplica(aAddress, threeToken, sixToken), fullReplica(cOrB, threeToken, sixToken)); - expectedResult.put(fullReplica(aAddress, threeToken, sixToken), transientReplica(dAddress, threeToken, sixToken)); + expectedResult.put(fullReplica(address01, threeToken, sixToken), fullReplica(cOrB, threeToken, sixToken)); + expectedResult.put(fullReplica(address01, threeToken, sixToken), transientReplica(address04, threeToken, sixToken)); invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().right, constructTMDsMoveForwardBetween(), @@ -343,7 +412,7 @@ public class MoveTransientTest @Test public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception { - for (InetAddressAndPort downNode : new InetAddressAndPort[] {dAddress, eAddress}) + for (InetAddressAndPort downNode : new InetAddressAndPort[] { address04, address05 }) { downNodes.clear(); downNodes.add(downNode); @@ -356,8 +425,7 @@ public class MoveTransientTest { ise.printStackTrace(); assertTrue(downNode.toString(), - ise.getMessage().startsWith("A node required to move the data consistently is down:") - && ise.getMessage().contains(downNode.toString())); + ise.getMessage().contains("Down nodes: [" + downNode + "]")); threw = true; } assertTrue("Didn't throw for " + downNode, threw); @@ -365,14 +433,14 @@ public class MoveTransientTest //Shouldn't throw because another full replica is available downNodes.clear(); - downNodes.add(cAddress); + downNodes.add(address03); testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); } @Test public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception { - for (InetAddressAndPort downNode : new InetAddressAndPort[] {dAddress, eAddress}) + for (InetAddressAndPort downNode : new InetAddressAndPort[] { address04, address05 }) { sourceFilterDownNodes.clear(); sourceFilterDownNodes.add(downNode); @@ -394,7 +462,7 @@ public class MoveTransientTest //Shouldn't throw because another full replica is available sourceFilterDownNodes.clear(); - sourceFilterDownNodes.add(cAddress); + sourceFilterDownNodes.add(address03); testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); } @@ -404,8 +472,8 @@ public class MoveTransientTest EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); //Need to pull the full replica and the transient replica that is losing the range - expectedResult.put(fullReplica(aAddress, nineToken, elevenToken), fullReplica(eAddress, nineToken, elevenToken)); - expectedResult.put(transientReplica(aAddress, sixToken, nineToken), transientReplica(eAddress, sixToken, nineToken)); + expectedResult.put(fullReplica(address01, nineToken, elevenToken), fullReplica(address05, nineToken, elevenToken)); + expectedResult.put(transientReplica(address01, sixToken, nineToken), transientReplica(address05, sixToken, nineToken)); invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().right, constructTMDsMoveBackwardBetween(), @@ -417,7 +485,7 @@ public class MoveTransientTest public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception { //Any replica can be the full replica so this will always fail on the transient range - downNodes.add(eAddress); + downNodes.add(address05); testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); } @@ -425,7 +493,7 @@ public class MoveTransientTest public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception { //Any replica can be the full replica so this will always fail on the transient range - sourceFilterDownNodes.add(eAddress); + sourceFilterDownNodes.add(address05); testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); } @@ -448,11 +516,11 @@ public class MoveTransientTest { EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); - InetAddressAndPort cOrBAddress = (downNodes.contains(cAddress) || sourceFilterDownNodes.contains(cAddress)) ? bAddress : cAddress; + InetAddressAndPort cOrBAddress = (downNodes.contains(address03) || sourceFilterDownNodes.contains(address03)) ? address02 : address03; //Need to pull the full replica and the transient replica that is losing the range - expectedResult.put(fullReplica(aAddress, threeToken, fourToken), fullReplica(cOrBAddress, threeToken, sixToken)); - expectedResult.put(fullReplica(aAddress, threeToken, fourToken), transientReplica(dAddress, threeToken, sixToken)); + expectedResult.put(fullReplica(address01, threeToken, fourToken), fullReplica(cOrBAddress, threeToken, sixToken)); + expectedResult.put(fullReplica(address01, threeToken, fourToken), transientReplica(address04, threeToken, sixToken)); invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForward().right, constructTMDsMoveForward(), @@ -463,7 +531,7 @@ public class MoveTransientTest @Test public void testMoveForwardCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception { - downNodes.add(dAddress); + downNodes.add(address04); boolean threw = false; try { @@ -472,23 +540,22 @@ public class MoveTransientTest catch (IllegalStateException ise) { ise.printStackTrace(); - assertTrue(dAddress.toString(), - ise.getMessage().startsWith("A node required to move the data consistently is down:") - && ise.getMessage().contains(dAddress.toString())); + assertTrue(address04.toString(), + ise.getMessage().contains("Down nodes: [" + address04 + "]")); threw = true; } - assertTrue("Didn't throw for " + dAddress, threw); + assertTrue("Didn't throw for " + address04, threw); //Shouldn't throw because another full replica is available downNodes.clear(); - downNodes.add(cAddress); + downNodes.add(address03); testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); } @Test public void testMoveForwardCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception { - sourceFilterDownNodes.add(dAddress); + sourceFilterDownNodes.add(address04); boolean threw = false; try { @@ -497,16 +564,16 @@ public class MoveTransientTest catch (IllegalStateException ise) { ise.printStackTrace(); - assertTrue(dAddress.toString(), + assertTrue(address04.toString(), ise.getMessage().startsWith("Necessary replicas for strict consistency were removed by source filters:") - && ise.getMessage().contains(dAddress.toString())); + && ise.getMessage().contains(address04.toString())); threw = true; } - assertTrue("Didn't throw for " + dAddress, threw); + assertTrue("Didn't throw for " + address04, threw); //Shouldn't throw because another full replica is available sourceFilterDownNodes.clear(); - sourceFilterDownNodes.add(cAddress); + sourceFilterDownNodes.add(address03); testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); } @@ -517,18 +584,16 @@ public class MoveTransientTest DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); EndpointsByReplica result = RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> replicas.sorted((a, b) -> b.endpoint().compareTo(a.endpoint())), - simpleStrategy(tmds.left), - toFetch, - true, - tmds.left, - tmds.right, - alivePredicate, - "OldNetworkTopologyStrategyTest", - sourceFilters); + simpleStrategy(tmds.left), + toFetch, + true, + tmds.left, + tmds.right, + "OldNetworkTopologyStrategyTest", + sourceFilters); logger.info("Ranges to fetch with preferred endpoints"); logger.info(result.toString()); assertMultimapEqualsIgnoreOrder(expectedResult, result); - } private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd) @@ -564,8 +629,8 @@ public class MoveTransientTest RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable(); //Need to pull the full replica and the transient replica that is losing the range - expectedResult.put(bAddress, transientReplica(bAddress, nineToken, elevenToken)); - expectedResult.put(bAddress, fullReplica(bAddress, elevenToken, oneToken)); + expectedResult.put(address02, transientReplica(address02, nineToken, elevenToken)); + expectedResult.put(address02, fullReplica(address02, elevenToken, oneToken)); invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().left, constructTMDsMoveForwardBetween(), @@ -577,12 +642,12 @@ public class MoveTransientTest { RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable(); - expectedResult.put(bAddress, fullReplica(bAddress, fourteenToken, oneToken)); + expectedResult.put(address02, fullReplica(address02, fourteenToken, oneToken)); - expectedResult.put(dAddress, transientReplica(dAddress, oneToken, threeToken)); + expectedResult.put(address04, transientReplica(address04, oneToken, threeToken)); - expectedResult.put(cAddress, fullReplica(cAddress, oneToken, threeToken)); - expectedResult.put(cAddress, transientReplica(cAddress, fourteenToken, oneToken)); + expectedResult.put(address03, fullReplica(address03, oneToken, threeToken)); + expectedResult.put(address03, transientReplica(address03, fourteenToken, oneToken)); invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().left, constructTMDsMoveBackwardBetween(), @@ -593,8 +658,8 @@ public class MoveTransientTest public void testMoveBackwardCalculateRangesToStreamWithPreferredEndpoints() throws Exception { RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable(); - expectedResult.put(cAddress, fullReplica(cAddress, twoToken, threeToken)); - expectedResult.put(dAddress, transientReplica(dAddress, twoToken, threeToken)); + expectedResult.put(address03, fullReplica(address03, twoToken, threeToken)); + expectedResult.put(address04, transientReplica(address04, twoToken, threeToken)); invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackward().left, constructTMDsMoveBackward(), @@ -617,7 +682,7 @@ public class MoveTransientTest RangesByEndpoint expectedResult) { DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); - StorageService.RangeRelocator relocator = new StorageService.RangeRelocator(); + RangeRelocator relocator = new RangeRelocator(); RangesByEndpoint result = relocator.calculateRangesToStreamWithEndpoints(toStream, simpleStrategy(tmds.left), tmds.left, @@ -631,8 +696,10 @@ public class MoveTransientTest { assertEquals(ranges.size(), replicas.length); for (Replica replica : replicas) + { if (!ranges.contains(replica)) - assertEquals(RangesAtEndpoint.of(replicas), ranges); + assertTrue(Iterables.elementsEqual(RangesAtEndpoint.of(replicas), ranges)); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/test/unit/org/apache/cassandra/service/StorageServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/StorageServiceTest.java b/test/unit/org/apache/cassandra/service/StorageServiceTest.java index 9d5c324..cc7fac3 100644 --- a/test/unit/org/apache/cassandra/service/StorageServiceTest.java +++ b/test/unit/org/apache/cassandra/service/StorageServiceTest.java @@ -38,6 +38,7 @@ import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.locator.TokenMetadata; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class StorageServiceTest { @@ -106,21 +107,32 @@ public class StorageServiceTest public static <K, C extends ReplicaCollection<? extends C>> void assertMultimapEqualsIgnoreOrder(ReplicaMultimap<K, C> a, ReplicaMultimap<K, C> b) { if (!a.keySet().equals(b.keySet())) - assertEquals(a, b); + fail(formatNeq(a, b)); for (K key : a.keySet()) { C ac = a.get(key); C bc = b.get(key); if (ac.size() != bc.size()) - assertEquals(a, b); + fail(formatNeq(a, b)); for (Replica r : ac) { if (!bc.contains(r)) - assertEquals(a, b); + fail(formatNeq(a, b)); } } } + public static String formatNeq(Object v1, Object v2) + { + return "\nExpected: " + formatClassAndValue(v1) + "\n but was: " + formatClassAndValue(v2); + } + + public static String formatClassAndValue(Object value) + { + String className = value == null ? "null" : value.getClass().getName(); + return className + "<" + String.valueOf(value) + ">"; + } + @Test public void testGetChangedReplicasForLeaving() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org