Author: jbellis Date: Thu Jul 1 17:20:39 2010 New Revision: 959726 URL: http://svn.apache.org/viewvc?rev=959726&view=rev Log: sanity-check replica count against number of nodes in the cluster. patch by mdennis; reviewed by jbellis for CASSANDRA-1191
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java cassandra/trunk/test/conf/cassandra-rack.properties cassandra/trunk/test/conf/cassandra.yaml cassandra/trunk/test/conf/datacenters.properties cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=959726&r1=959725&r2=959726&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Thu Jul 1 17:20:39 2010 @@ -20,7 +20,10 @@ package org.apache.cassandra.locator; import java.net.InetAddress; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,9 +71,11 @@ public abstract class AbstractReplicatio * @param searchToken the token the natural endpoints are requested for * @param table the table the natural endpoints are requested for * @return a copy of the natural endpoints for the given token and table + * @throws IllegalStateException if the number of requested replicas is greater than the number of known endpints */ - public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, String table) + public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, String table) throws IllegalStateException { + int replicas = getReplicationFactor(table); Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken); EndpointCacheKey cacheKey = new EndpointCacheKey(table, keyToken); ArrayList<InetAddress> endpoints = cachedEndpoints.get(cacheKey); @@ -83,10 +88,23 @@ public abstract class AbstractReplicatio cachedEndpoints.put(cacheKey, endpoints); } + // calculateNaturalEndpoints should have checked this already, this is a safety + assert replicas <= endpoints.size(); + return new ArrayList<InetAddress>(endpoints); } - public abstract Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table); + /** + * calculate the natural endpionts for the given token, for the given table. + * + * @see #getNaturalEndpoints(org.apache.cassandra.dht.Token, String) + * + * @param searchToken the token the natural endpoints are requested for + * @param table the table the natural endpoints are requested for + * @return a copy of the natural endpoints for the given token and table + * @throws IllegalStateException if the number of requested replicas is greater than the number of known endpoints + */ + public abstract Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table) throws IllegalStateException; public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, @@ -95,7 +113,13 @@ public abstract class AbstractReplicatio { return new WriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table); } - + + // instance method so test subclasses can override it + int getReplicationFactor(String table) + { + return DatabaseDescriptor.getReplicationFactor(table); + } + /** * returns <tt>Multimap</tt> of {live destination: ultimate targets}, where if target is not the same * as the destination, it is a "hinted" write, and will need to be sent to Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java?rev=959726&r1=959725&r2=959726&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java Thu Jul 1 17:20:39 2010 @@ -94,7 +94,7 @@ public class DatacenterShardStrategy ext public Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table) { - int totalReplicas = getReplicationfactor(table); + int totalReplicas = getReplicationFactor(table); Map<String, Integer> remainingReplicas = new HashMap<String, Integer>(datacenters.get(table)); Map<String, Set<String>> dcUsedRacks = new HashMap<String, Set<String>>(); Set<InetAddress> endpoints = new HashSet<InetAddress>(totalReplicas); @@ -125,7 +125,7 @@ public class DatacenterShardStrategy ext } } - // 2nd pass: if replica count has not been achieved from unique racks, add nodes from the same racks + // second pass: if replica count has not been achieved from unique racks, add nodes from the same racks for (Iterator<Token> iter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken); endpoints.size() < totalReplicas && iter.hasNext();) { @@ -143,10 +143,16 @@ public class DatacenterShardStrategy ext } } + for (Map.Entry<String, Integer> entry : remainingReplicas.entrySet()) + { + if (entry.getValue() > 0) + throw new IllegalStateException(String.format("datacenter (%s) has no more endpoints, (%s) replicas still needed", entry.getKey(), entry.getValue())); + } + return endpoints; } - public int getReplicationfactor(String table) + public int getReplicationFactor(String table) { int total = 0; for (int repFactor : datacenters.get(table).values()) Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=959726&r1=959725&r2=959726&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Thu Jul 1 17:20:39 2010 @@ -46,7 +46,7 @@ public class RackAwareStrategy extends A public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table) { - int replicas = DatabaseDescriptor.getReplicationFactor(table); + int replicas = getReplicationFactor(table); Set<InetAddress> endpoints = new HashSet<InetAddress>(replicas); ArrayList<Token> tokens = metadata.sortedTokens(); @@ -100,6 +100,9 @@ public class RackAwareStrategy extends A if (!endpoints.contains(metadata.getEndpoint(t))) endpoints.add(metadata.getEndpoint(t)); } + + if (endpoints.size() < replicas) + throw new IllegalStateException(String.format("replication factor (%s) exceeds number of endpoints (%s)", replicas, endpoints.size())); } return endpoints; Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=959726&r1=959725&r2=959726&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Thu Jul 1 17:20:39 2010 @@ -31,7 +31,7 @@ import org.apache.cassandra.dht.Token; /** * This class returns the nodes responsible for a given * key but does not respect rack awareness. Basically - * returns the 3 nodes that lie right next to each other + * returns the RF nodes that lie right next to each other * on the ring. */ public class RackUnawareStrategy extends AbstractReplicationStrategy @@ -43,7 +43,7 @@ public class RackUnawareStrategy extends public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table) { - int replicas = DatabaseDescriptor.getReplicationFactor(table); + int replicas = getReplicationFactor(table); ArrayList<Token> tokens = metadata.sortedTokens(); Set<InetAddress> endpoints = new HashSet<InetAddress>(replicas); @@ -57,6 +57,9 @@ public class RackUnawareStrategy extends endpoints.add(metadata.getEndpoint(iter.next())); } + if (endpoints.size() < replicas) + throw new IllegalStateException(String.format("replication factor (%s) exceeds number of endpoints (%s)", replicas, endpoints.size())); + return endpoints; } } Modified: cassandra/trunk/test/conf/cassandra-rack.properties URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra-rack.properties?rev=959726&r1=959725&r2=959726&view=diff ============================================================================== --- cassandra/trunk/test/conf/cassandra-rack.properties (original) +++ cassandra/trunk/test/conf/cassandra-rack.properties Thu Jul 1 17:20:39 2010 @@ -33,10 +33,13 @@ 10.20.114.15=DC2:RAC2 127.0.0.1=DC1:RAC1 -127.0.0.2=DC2:RAC2 -127.0.0.3=DC3:RAC3 -127.0.0.4=DC4:RAC4 -127.0.0.5=DC3:RAC3 +127.0.0.2=DC1:RAC2 +127.0.0.3=DC1:RAC3 +127.0.0.4=DC2:RAC4 +127.0.0.5=DC2:RAC5 +127.0.0.6=DC3:RAC6 +127.0.0.7=DC3:RAC7 +127.0.0.8=DC3:RAC8 # default for unknown nodes default=DC1:r1 Modified: cassandra/trunk/test/conf/cassandra.yaml URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=959726&r1=959725&r2=959726&view=diff ============================================================================== --- cassandra/trunk/test/conf/cassandra.yaml (original) +++ cassandra/trunk/test/conf/cassandra.yaml Thu Jul 1 17:20:39 2010 @@ -100,3 +100,10 @@ keyspaces: - name: Super4 column_type: Super compare_subcolumns_with: TimeUUIDType + + - name: Keyspace5 + replica_placement_strategy: org.apache.cassandra.locator.RackUnawareStrategy + replication_factor: 2 + column_families: + + - name: Standard1 Modified: cassandra/trunk/test/conf/datacenters.properties URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/datacenters.properties?rev=959726&r1=959725&r2=959726&view=diff ============================================================================== --- cassandra/trunk/test/conf/datacenters.properties (original) +++ cassandra/trunk/test/conf/datacenters.properties Thu Jul 1 17:20:39 2010 @@ -22,5 +22,5 @@ Keyspace1\:DC1=3 Keyspace1\:DC2=2 Keyspace1\:DC3=1 Keyspace3\:DC1=3 -Keyspace3\:DC2=2 +Keyspace3\:DC2=1 Keyspace3\:DC3=1 \ No newline at end of file Modified: cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=959726&r1=959725&r2=959726&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Thu Jul 1 17:20:39 2010 @@ -54,29 +54,32 @@ public class BootStrapperTest extends Cl { StorageService ss = StorageService.instance; - generateFakeEndpoints(3); + generateFakeEndpoints(5); + + InetAddress two = InetAddress.getByName("127.0.0.2"); + InetAddress three = InetAddress.getByName("127.0.0.3"); + InetAddress four = InetAddress.getByName("127.0.0.4"); + InetAddress five = InetAddress.getByName("127.0.0.5"); - InetAddress one = InetAddress.getByName("127.0.0.2"); - InetAddress two = InetAddress.getByName("127.0.0.3"); - InetAddress three = InetAddress.getByName("127.0.0.4"); Map<InetAddress, Double> load = new HashMap<InetAddress, Double>(); - load.put(one, 1.0); load.put(two, 2.0); load.put(three, 3.0); + load.put(four, 4.0); + load.put(five, 5.0); TokenMetadata tmd = ss.getTokenMetadata(); InetAddress source = BootStrapper.getBootstrapSource(tmd, load); - assert three.equals(source); + assert five.equals(source) : five + " != " + source; InetAddress myEndpoint = InetAddress.getByName("127.0.0.1"); - Range range3 = ss.getPrimaryRangeForEndpoint(three); - Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left, range3.right); - assert range3.contains(fakeToken); + Range range5 = ss.getPrimaryRangeForEndpoint(five); + Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range5.left, range5.right); + assert range5.contains(fakeToken); ss.onChange(myEndpoint, StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + ss.getPartitioner().getTokenFactory().toString(fakeToken))); tmd = ss.getTokenMetadata(); - InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load); - assert two.equals(source2) : source2; + InetAddress source4 = BootStrapper.getBootstrapSource(tmd, load); + assert four.equals(source4) : four + " != " + source4; } @Test Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java?rev=959726&r1=959725&r2=959726&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java Thu Jul 1 17:20:39 2010 @@ -48,6 +48,12 @@ public class ReplicationStrategyEndpoint tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddress.getByName("127.0.0.1")); tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddress.getByName("127.0.0.2")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.3")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(40)), InetAddress.getByName("127.0.0.4")); + //tmd.updateNormalToken(new BigIntegerToken(String.valueOf(50)), InetAddress.getByName("127.0.0.5")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(60)), InetAddress.getByName("127.0.0.6")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(70)), InetAddress.getByName("127.0.0.7")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(80)), InetAddress.getByName("127.0.0.8")); } @Test @@ -74,30 +80,35 @@ public class ReplicationStrategyEndpoint public void runCacheRespectsTokenChangesTest(Class stratClass) throws Exception { - // TODO DSS is asked to provide a total of 6 replicas, but we never give it 6 endpoints. - // thus we are testing undefined behavior, at best. setup(stratClass); + ArrayList<InetAddress> initial; ArrayList<InetAddress> endpoints; endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3"); - assert endpoints.size() == 2 : StringUtils.join(endpoints, ","); + assert endpoints.size() == 5 : StringUtils.join(endpoints, ","); - // test token addition - tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.3")); + // test token addition, in DC2 before existing token + initial = strategy.getNaturalEndpoints(searchToken, "Keyspace3"); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), InetAddress.getByName("127.0.0.5")); endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3"); - assert endpoints.size() == 3 : StringUtils.join(endpoints, ","); + assert endpoints.size() == 5 : StringUtils.join(endpoints, ","); + assert !endpoints.equals(initial); - // test token removal - tmd.removeEndpoint(InetAddress.getByName("127.0.0.2")); + // test token removal, newly created token + initial = strategy.getNaturalEndpoints(searchToken, "Keyspace3"); + tmd.removeEndpoint(InetAddress.getByName("127.0.0.5")); endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3"); - assert endpoints.size() == 2 : StringUtils.join(endpoints, ","); + assert endpoints.size() == 5 : StringUtils.join(endpoints, ","); + assert !endpoints.contains(InetAddress.getByName("127.0.0.5")); + assert !endpoints.equals(initial); // test token change - tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.5")); + initial = strategy.getNaturalEndpoints(searchToken, "Keyspace3"); + //move .8 after search token but before other DC3 + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), InetAddress.getByName("127.0.0.8")); endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3"); - assert endpoints.size() == 2 : StringUtils.join(endpoints, ","); - assert endpoints.contains(InetAddress.getByName("127.0.0.5")); - assert !endpoints.contains(InetAddress.getByName("127.0.0.3")); + assert endpoints.size() == 5 : StringUtils.join(endpoints, ","); + assert !endpoints.equals(initial); } protected static class FakeRackUnawareStrategy extends RackUnawareStrategy Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=959726&r1=959725&r2=959726&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Thu Jul 1 17:20:39 2010 @@ -65,7 +65,7 @@ public class AntiEntropyServiceTest exte public static void prepareClass() throws Exception { LOCAL = FBUtilities.getLocalAddress(); - tablename = "Keyspace4"; + tablename = "Keyspace5"; StorageService.instance.initServer(); // generate a fake endpoint for which we can spoof receiving/sending trees REMOTE = InetAddress.getByName("127.0.0.2"); Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java?rev=959726&r1=959725&r2=959726&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Thu Jul 1 17:20:39 2010 @@ -36,6 +36,7 @@ import org.apache.cassandra.locator.Abst import org.apache.cassandra.locator.RackUnawareStrategy; import org.apache.cassandra.locator.SimpleSnitch; import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.utils.Pair; public class MoveTest extends CleanupHelper { @@ -48,17 +49,16 @@ public class MoveTest extends CleanupHel return replacements; } - /** * Test whether write endpoints is correct when the node is leaving. Uses * StorageService.onChange and does not manipulate token metadata directly. */ @Test - public void testWriteEndpointsDuringLeave() throws UnknownHostException + public void newTestWriteEndpointsDuringLeave() throws Exception { StorageService ss = StorageService.instance; - final int RING_SIZE = 5; - final int LEAVING_NODE = 2; + final int RING_SIZE = 6; + final int LEAVING_NODE = 3; TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); @@ -74,52 +74,47 @@ public class MoveTest extends CleanupHel createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE); - final Map<String, List<Range>> deadNodesRanges = new HashMap<String, List<Range>>(); + final Map<Pair<String, Token>, List<InetAddress>> expectedEndpoints = new HashMap<Pair<String, Token>, List<InetAddress>>(); for (String table : DatabaseDescriptor.getNonSystemTables()) { - List<Range> list = new ArrayList<Range>(); - list.addAll(testStrategy.getAddressRanges(table).get(hosts.get(LEAVING_NODE))); - Collections.sort(list); - deadNodesRanges.put(table, list); + for (Token token : keyTokens) + { + List<InetAddress> endpoints = new ArrayList<InetAddress>(); + Pair<String, Token> key = new Pair<String, Token>(table, token); + Iterator<Token> tokenIter = TokenMetadata.ringIterator(tmd.sortedTokens(), token); + while (tokenIter.hasNext()) + { + endpoints.add(tmd.getEndpoint(tokenIter.next())); + } + expectedEndpoints.put(key, endpoints); + } } - + // Third node leaves ss.onChange(hosts.get(LEAVING_NODE), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(LEAVING_NODE)))); - - // check that it is correctly marked as leaving in tmd assertTrue(tmd.isLeaving(hosts.get(LEAVING_NODE))); - // check that pending ranges are correct (primary range should go to 1st node, first - // replica range to 4th node and 2nd replica range to 5th node) for (String table : DatabaseDescriptor.getNonSystemTables()) { - int replicationFactor = DatabaseDescriptor.getReplicationFactor(table); - - // if the ring minus the leaving node leaves us with less than RF, we're hosed. - if (hosts.size()-1 < replicationFactor) - continue; - - // verify that the replicationFactor nodes after the leaving node are gatherings it's pending ranges. - // in the case where rf==5, we're screwed because we basically just lost data. - for (int i = 0; i < replicationFactor; i++) + for (Token token : keyTokens) { - assertTrue(tmd.getPendingRanges(table, hosts.get((LEAVING_NODE + 1 + i) % RING_SIZE)).size() > 0); - assertEquals(tmd.getPendingRanges(table, hosts.get((LEAVING_NODE + 1 + i) % RING_SIZE)).get(0), deadNodesRanges.get(table).get(i)); - } + Pair<String, Token> key = new Pair<String, Token>(table, token); + int replicationFactor = DatabaseDescriptor.getReplicationFactor(table); - // note that we're iterating over nodes and sample tokens. - final int replicaStart = (LEAVING_NODE-replicationFactor+RING_SIZE)%RING_SIZE; - for (int i=0; i<keyTokens.size(); ++i) - { - Collection<InetAddress> endpoints = tmd.getWriteEndpoints(keyTokens.get(i), table, testStrategy.getNaturalEndpoints(keyTokens.get(i), table)); - // figure out if this node is one of the nodes previous to the failed node (2). - boolean isReplica = (i - replicaStart + RING_SIZE) % RING_SIZE < replicationFactor; - // the preceeding leaving_node-replication_factor nodes should have and additional ep (replication_factor+1); - if (isReplica) - assertTrue(endpoints.size() == replicationFactor + 1); - else - assertTrue(endpoints.size() == replicationFactor); + HashSet<InetAddress> actual = new HashSet<InetAddress>(tmd.getWriteEndpoints(token, table, testStrategy.calculateNaturalEndpoints(token, tmd, table))); + HashSet<InetAddress> expected = new HashSet<InetAddress>(); + for (int i = 0; i < replicationFactor; i++) + { + expected.add(expectedEndpoints.get(key).get(i)); + } + + // if the leaving node is in the endpoint list, + // then we should expect it plus one extra for when it's gone + if (expected.contains(hosts.get(LEAVING_NODE))) + expected.add(expectedEndpoints.get(key).get(replicationFactor)); + + assertEquals("mismatched endpoint sets", expected, actual); } } @@ -152,7 +147,7 @@ public class MoveTest extends CleanupHel createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE); // nodes 6, 8 and 9 leave - final int[] LEAVING = new int[] { 6, 8, 9}; + final int[] LEAVING = new int[] {6, 8, 9}; for (int leaving : LEAVING) ss.onChange(hosts.get(leaving), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(leaving)))); @@ -164,6 +159,9 @@ public class MoveTest extends CleanupHel Collection<InetAddress> endpoints = null; + /* don't require test update every time a new keyspace is added to test/conf/cassandra.yaml */ + List<String> tables = Arrays.asList("Keyspace1", "Keyspace2", "Keyspace3", "Keyspace4"); + // pre-calculate the results. Map<String, Multimap<Token, InetAddress>> expectedEndpoints = new HashMap<String, Multimap<Token, InetAddress>>(); expectedEndpoints.put("Keyspace1", HashMultimap.<Token, InetAddress>create()); @@ -211,7 +209,7 @@ public class MoveTest extends CleanupHel expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3")); expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.1", "127.0.0.2", "127.0.0.3")); - for (String table : DatabaseDescriptor.getNonSystemTables()) + for (String table : tables) { for (int i = 0; i < keyTokens.size(); i++) { @@ -326,7 +324,7 @@ public class MoveTest extends CleanupHel expectedEndpoints.get("Keyspace4").get(new BigIntegerToken("75")).removeAll(makeAddrs("127.0.0.10")); expectedEndpoints.get("Keyspace4").get(new BigIntegerToken("85")).removeAll(makeAddrs("127.0.0.10")); - for (String table : DatabaseDescriptor.getNonSystemTables()) + for (String table : tables) { for (int i = 0; i < keyTokens.size(); i++) { @@ -425,7 +423,7 @@ public class MoveTest extends CleanupHel List<InetAddress> hosts = new ArrayList<InetAddress>(); // create a ring or 5 nodes - createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 5); + createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 7); // node 2 leaves ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(2)))); @@ -494,7 +492,7 @@ public class MoveTest extends CleanupHel List<InetAddress> hosts = new ArrayList<InetAddress>(); // create a ring or 5 nodes - createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 5); + createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 6); // node 2 leaves ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(2)))); @@ -538,7 +536,7 @@ public class MoveTest extends CleanupHel List<InetAddress> hosts = new ArrayList<InetAddress>(); // create a ring or 5 nodes - createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 5); + createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 6); // node 2 leaves with _different_ token ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(0)))); @@ -587,8 +585,8 @@ public class MoveTest extends CleanupHel ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); - // create a ring or 5 nodes - createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 5); + // create a ring of 6 nodes + createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 7); // node hosts.get(2) goes jumps to left ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + StorageService.LEFT_NORMALLY + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(2))));