update NTS calculateNaturalEndpoints to be O(N log N) patch by Sam Overton; reviewed by jbellis for CASSANDRA-3881
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9688a79d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9688a79d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9688a79d Branch: refs/heads/trunk Commit: 9688a79d0c315395772d15d92e051d00e18b966b Parents: 893d1da Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue Jul 3 11:58:23 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Jul 3 11:58:23 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/locator/NetworkTopologyStrategy.java | 123 ++++++++++----- .../locator/NetworkTopologyStrategyTest.java | 62 +++++++- 3 files changed, 140 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9688a79d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7f36e9f..4ce9884 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-dev + * update NTS calculateNaturalEndpoints to be O(N log N) (CASSANDRA-3881) * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366) * split up rpc timeout by operation type (CASSANDRA-2819) * rewrite key cache save/load to use only sequential i/o (CASSANDRA-3762) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9688a79d/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java index 7b2ec91..30629d8 100644 --- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java +++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java @@ -27,8 +27,10 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.TokenMetadata.Topology; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; + +import com.google.common.collect.Multimap; /** * This Replication Strategy takes a property file that gives the intended @@ -71,59 +73,96 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy logger.debug("Configured datacenter replicas are {}", FBUtilities.toString(datacenters)); } + /** + * calculate endpoints in one pass through the tokens by tracking our progress in each DC, rack etc. + */ + @SuppressWarnings("serial") public List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata) { - List<InetAddress> endpoints = new ArrayList<InetAddress>(getReplicationFactor()); - - for (Entry<String, Integer> dcEntry : datacenters.entrySet()) + Set<InetAddress> replicas = new HashSet<InetAddress>(); + // replicas we have found in each DC + Map<String, Set<InetAddress>> dcReplicas = new HashMap<String, Set<InetAddress>>(datacenters.size()) + {{ + for (Map.Entry<String, Integer> dc : datacenters.entrySet()) + put(dc.getKey(), new HashSet<InetAddress>(dc.getValue())); + }}; + Topology topology = tokenMetadata.getTopology(); + // all endpoints in each DC, so we can check when we have exhausted all the members of a DC + Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints(); + // all racks in a DC so we can check when we have exhausted all racks in a DC + Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks(); + assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members"; + + // tracks the racks we have already placed replicas in + Map<String, Set<String>> seenRacks = new HashMap<String, Set<String>>(datacenters.size()) + {{ + for (Map.Entry<String, Integer> dc : datacenters.entrySet()) + put(dc.getKey(), new HashSet<String>()); + }}; + // tracks the endpoints that we skipped over while looking for unique racks + // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator + Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<String, Set<InetAddress>>(datacenters.size()) + {{ + for (Map.Entry<String, Integer> dc : datacenters.entrySet()) + put(dc.getKey(), new LinkedHashSet<InetAddress>()); + }}; + Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false); + while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints)) { - String dcName = dcEntry.getKey(); - int dcReplicas = dcEntry.getValue(); - - // collect endpoints in this DC; add in bulk to token meta data for computational complexity - // reasons (CASSANDRA-3831). - Set<Pair<Token, InetAddress>> dcTokensToUpdate = new HashSet<Pair<Token, InetAddress>>(); - for (Entry<Token, InetAddress> tokenEntry : tokenMetadata.getTokenToEndpointMapForReading().entrySet()) + Token next = tokenIter.next(); + InetAddress ep = tokenMetadata.getEndpoint(next); + String dc = snitch.getDatacenter(ep); + // have we already found all replicas for this dc? + if (!datacenters.containsKey(dc) || hasSufficientReplicas(dc, dcReplicas, allEndpoints)) + continue; + // can we skip checking the rack? + if (seenRacks.get(dc).size() == racks.get(dc).keySet().size()) { - if (snitch.getDatacenter(tokenEntry.getValue()).equals(dcName)) - dcTokensToUpdate.add(Pair.create(tokenEntry.getKey(), tokenEntry.getValue())); + dcReplicas.get(dc).add(ep); + replicas.add(ep); } - TokenMetadata dcTokens = new TokenMetadata(); - dcTokens.updateNormalTokens(dcTokensToUpdate); - - List<InetAddress> dcEndpoints = new ArrayList<InetAddress>(dcReplicas); - Set<String> racks = new HashSet<String>(); - // first pass: only collect replicas on unique racks - for (Iterator<Token> iter = TokenMetadata.ringIterator(dcTokens.sortedTokens(), searchToken, false); - dcEndpoints.size() < dcReplicas && iter.hasNext(); ) + else { - Token token = iter.next(); - InetAddress endpoint = dcTokens.getEndpoint(token); - String rack = snitch.getRack(endpoint); - if (!racks.contains(rack)) + String rack = snitch.getRack(ep); + // is this a new rack? + if (seenRacks.get(dc).contains(rack)) { - dcEndpoints.add(endpoint); - racks.add(rack); + skippedDcEndpoints.get(dc).add(ep); + } + else + { + dcReplicas.get(dc).add(ep); + replicas.add(ep); + seenRacks.get(dc).add(rack); + // if we've run out of distinct racks, add the hosts we skipped past already (up to RF) + if (seenRacks.get(dc).size() == racks.get(dc).keySet().size()) + { + Iterator<InetAddress> skippedIt = skippedDcEndpoints.get(dc).iterator(); + while (skippedIt.hasNext() && !hasSufficientReplicas(dc, dcReplicas, allEndpoints)) + { + InetAddress nextSkipped = skippedIt.next(); + dcReplicas.get(dc).add(nextSkipped); + replicas.add(nextSkipped); + } + } } } + } - // second pass: if replica count has not been achieved from unique racks, add nodes from duplicate racks - for (Iterator<Token> iter = TokenMetadata.ringIterator(dcTokens.sortedTokens(), searchToken, false); - dcEndpoints.size() < dcReplicas && iter.hasNext(); ) - { - Token token = iter.next(); - InetAddress endpoint = dcTokens.getEndpoint(token); - if (!dcEndpoints.contains(endpoint)) - dcEndpoints.add(endpoint); - } + return new ArrayList<InetAddress>(replicas); + } - if (logger.isDebugEnabled()) - logger.debug("{} endpoints in datacenter {} for token {} ", - new Object[] { StringUtils.join(dcEndpoints, ","), dcName, searchToken}); - endpoints.addAll(dcEndpoints); - } + private boolean hasSufficientReplicas(String dc, Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints) + { + return dcReplicas.get(dc).size() >= Math.min(allEndpoints.get(dc).size(), getReplicationFactor(dc)); + } - return endpoints; + private boolean hasSufficientReplicas(Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints) + { + for (String dc : datacenters.keySet()) + if (!hasSufficientReplicas(dc, dcReplicas, allEndpoints)) + return false; + return true; } public int getReplicationFactor() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9688a79d/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java index 04cffbb..9e3d684 100644 --- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java @@ -25,24 +25,32 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; -import javax.xml.parsers.ParserConfigurationException; +import java.util.Set; + +import junit.framework.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.StringToken; import org.apache.cassandra.dht.Token; -import org.xml.sax.SAXException; +import org.apache.cassandra.utils.Pair; public class NetworkTopologyStrategyTest { private String table = "Keyspace1"; + private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategyTest.class); @Test - public void testProperties() throws IOException, ParserConfigurationException, SAXException, ConfigurationException + public void testProperties() throws IOException, ConfigurationException { IEndpointSnitch snitch = new PropertyFileSnitch(); + DatabaseDescriptor.setEndpointSnitch(snitch); TokenMetadata metadata = new TokenMetadata(); createDummyTokens(metadata, true); @@ -63,9 +71,10 @@ public class NetworkTopologyStrategyTest } @Test - public void testPropertiesWithEmptyDC() throws IOException, ParserConfigurationException, SAXException, ConfigurationException + public void testPropertiesWithEmptyDC() throws IOException, ConfigurationException { IEndpointSnitch snitch = new PropertyFileSnitch(); + DatabaseDescriptor.setEndpointSnitch(snitch); TokenMetadata metadata = new TokenMetadata(); createDummyTokens(metadata, false); @@ -85,6 +94,51 @@ public class NetworkTopologyStrategyTest assert 6 == new HashSet<InetAddress>(endpoints).size(); // ensure uniqueness } + @Test + public void testLargeCluster() throws UnknownHostException, ConfigurationException + { + int[] dcRacks = new int[]{2, 4, 8}; + int[] dcEndpoints = new int[]{128, 256, 512}; + int[] dcReplication = new int[]{2, 6, 6}; + + IEndpointSnitch snitch = new RackInferringSnitch(); + DatabaseDescriptor.setEndpointSnitch(snitch); + TokenMetadata metadata = new TokenMetadata(); + Map<String, String> configOptions = new HashMap<String, String>(); + Set<Pair<Token, InetAddress>> tokens = new HashSet<Pair<Token, InetAddress>>(); + + int totalRF = 0; + for (int dc = 0; dc < dcRacks.length; ++dc) + { + totalRF += dcReplication[dc]; + configOptions.put(Integer.toString(dc), Integer.toString(dcReplication[dc])); + for (int rack = 0; rack < dcRacks[dc]; ++rack) + { + for (int ep = 1; ep <= dcEndpoints[dc]/dcRacks[dc]; ++ep) + { + byte[] ipBytes = new byte[]{10, (byte)dc, (byte)rack, (byte)ep}; + InetAddress address = InetAddress.getByAddress(ipBytes); + StringToken token = new StringToken(String.format("%02x%02x%02x", ep, rack, dc)); + logger.debug("adding node " + address + " at " + token); + tokens.add(new Pair<Token, InetAddress>(token, address)); + } + } + } + metadata.updateNormalTokens(tokens); + + NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(table, metadata, snitch, configOptions); + + for (String testToken : new String[]{"123456", "200000", "000402", "ffffff", "400200"}) + { + List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(new StringToken(testToken), metadata); + Set<InetAddress> epSet = new HashSet<InetAddress>(endpoints); + + Assert.assertEquals(totalRF, endpoints.size()); + Assert.assertEquals(totalRF, epSet.size()); + logger.debug(testToken + ": " + endpoints.toString()); + } + } + public void createDummyTokens(TokenMetadata metadata, boolean populateDC3) throws UnknownHostException { // DC 1