Repository: cassandra Updated Branches: refs/heads/trunk 29ec013c2 -> c000da135
Improve NTS endpoints calculation patch by Branimir Lambov; reviewed by Aleksey Yeschenko for CASSANDRA-10200 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c000da13 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c000da13 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c000da13 Branch: refs/heads/trunk Commit: c000da13563907b99fe220a7c8bde3c1dec74ad5 Parents: 29ec013 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Wed Aug 26 16:08:57 2015 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Wed Nov 18 15:44:21 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../locator/NetworkTopologyStrategy.java | 157 ++++++++------ .../apache/cassandra/locator/TokenMetadata.java | 21 +- .../locator/NetworkTopologyStrategyTest.java | 213 ++++++++++++++++++- 4 files changed, 317 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000da13/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2710ed3..77034ef 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.2 + * Improve NTS endpoints calculation (CASSANDRA-10200) * Improve performance of the folderSize function (CASSANDRA-10677) * Add support for type casting in selection clause (CASSANDRA-10310) * Added graphing option to cassandra-stress (CASSANDRA-7918) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000da13/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java index 307a07f..9f74dcc 100644 --- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java +++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java @@ -28,6 +28,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.TokenMetadata.Topology; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import com.google.common.collect.Multimap; @@ -48,14 +49,12 @@ import com.google.common.collect.Multimap; */ public class NetworkTopologyStrategy extends AbstractReplicationStrategy { - private final IEndpointSnitch snitch; private final Map<String, Integer> datacenters; private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategy.class); public NetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) throws ConfigurationException { super(keyspaceName, tokenMetadata, snitch, configOptions); - this.snitch = snitch; Map<String, Integer> newDatacenters = new HashMap<String, Integer>(); if (configOptions != null) @@ -75,17 +74,78 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy } /** - * calculate endpoints in one pass through the tokens by tracking our progress in each DC, rack etc. + * Endpoint adder applying the replication rules for a given DC. + */ + private static final class DatacenterEndpoints + { + /** List accepted endpoints get pushed into. */ + Set<InetAddress> endpoints; + /** + * Racks encountered so far. Replicas are put into separate racks while possible. + * For efficiency the set is shared between the instances, using the location pair (dc, rack) to make sure + * clashing names aren't a problem. + */ + Set<Pair<String, String>> racks; + + /** Number of replicas left to fill from this DC. */ + int rfLeft; + int acceptableRackRepeats; + + DatacenterEndpoints(int rf, int rackCount, int nodeCount, Set<InetAddress> endpoints, Set<Pair<String, String>> racks) + { + this.endpoints = endpoints; + this.racks = racks; + // If there aren't enough nodes in this DC to fill the RF, the number of nodes is the effective RF. + this.rfLeft = Math.min(rf, nodeCount); + // If there aren't enough racks in this DC to fill the RF, we'll still use at least one node from each rack, + // and the difference is to be filled by the first encountered nodes. + acceptableRackRepeats = rf - rackCount; + } + + /** + * Attempts to add an endpoint to the replicas for this datacenter, adding to the endpoints set if successful. + * Returns true if the endpoint was added, and this datacenter does not require further replicas. + */ + boolean addEndpointAndCheckIfDone(InetAddress ep, Pair<String,String> location) + { + if (done()) + return false; + + if (racks.add(location)) + { + // New rack. + --rfLeft; + boolean added = endpoints.add(ep); + assert added; + return done(); + } + if (acceptableRackRepeats <= 0) + // There must be rfLeft distinct racks left, do not add any more rack repeats. + return false; + if (!endpoints.add(ep)) + // Cannot repeat a node. + return false; + // Added a node that is from an already met rack to match RF when there aren't enough racks. + --acceptableRackRepeats; + --rfLeft; + return done(); + } + + boolean done() + { + assert rfLeft >= 0; + return rfLeft == 0; + } + } + + /** + * calculate endpoints in one pass through the tokens by tracking our progress in each DC. */ - @SuppressWarnings("serial") public List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata) { // we want to preserve insertion order so that the first added endpoint becomes primary Set<InetAddress> replicas = new LinkedHashSet<>(); - // replicas we have found in each DC - Map<String, Set<InetAddress>> dcReplicas = new HashMap<>(datacenters.size()); - for (Map.Entry<String, Integer> dc : datacenters.entrySet()) - dcReplicas.put(dc.getKey(), new HashSet<InetAddress>(dc.getValue())); + Set<Pair<String, String>> seenRacks = new HashSet<>(); Topology topology = tokenMetadata.getTopology(); // all endpoints in each DC, so we can check when we have exhausted all the members of a DC @@ -94,74 +154,45 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks(); assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members"; - // tracks the racks we have already placed replicas in - Map<String, Set<String>> seenRacks = new HashMap<>(datacenters.size()); - for (Map.Entry<String, Integer> dc : datacenters.entrySet()) - seenRacks.put(dc.getKey(), new HashSet<String>()); + int dcsToFill = 0; + Map<String, DatacenterEndpoints> dcs = new HashMap<>(datacenters.size() * 2); + + // Create a DatacenterEndpoints object for each non-empty DC. + for (Map.Entry<String, Integer> en : datacenters.entrySet()) + { + String dc = en.getKey(); + int rf = en.getValue(); + int nodeCount = sizeOrZero(allEndpoints.get(dc)); + + if (rf <= 0 || nodeCount <= 0) + continue; - // tracks the endpoints that we skipped over while looking for unique racks - // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator - Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<>(datacenters.size()); - for (Map.Entry<String, Integer> dc : datacenters.entrySet()) - skippedDcEndpoints.put(dc.getKey(), new LinkedHashSet<InetAddress>()); + DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, replicas, seenRacks); + dcs.put(dc, dcEndpoints); + ++dcsToFill; + } Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false); - while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints)) + while (dcsToFill > 0 && tokenIter.hasNext()) { Token next = tokenIter.next(); InetAddress ep = tokenMetadata.getEndpoint(next); - String dc = snitch.getDatacenter(ep); - // have we already found all replicas for this dc? - if (!datacenters.containsKey(dc) || hasSufficientReplicas(dc, dcReplicas, allEndpoints)) - continue; - // can we skip checking the rack? - if (seenRacks.get(dc).size() == racks.get(dc).keySet().size()) - { - dcReplicas.get(dc).add(ep); - replicas.add(ep); - } - else - { - String rack = snitch.getRack(ep); - // is this a new rack? - if (seenRacks.get(dc).contains(rack)) - { - skippedDcEndpoints.get(dc).add(ep); - } - else - { - dcReplicas.get(dc).add(ep); - replicas.add(ep); - seenRacks.get(dc).add(rack); - // if we've run out of distinct racks, add the hosts we skipped past already (up to RF) - if (seenRacks.get(dc).size() == racks.get(dc).keySet().size()) - { - Iterator<InetAddress> skippedIt = skippedDcEndpoints.get(dc).iterator(); - while (skippedIt.hasNext() && !hasSufficientReplicas(dc, dcReplicas, allEndpoints)) - { - InetAddress nextSkipped = skippedIt.next(); - dcReplicas.get(dc).add(nextSkipped); - replicas.add(nextSkipped); - } - } - } - } + Pair<String, String> location = topology.getLocation(ep); + DatacenterEndpoints dcEndpoints = dcs.get(location.left); + if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location)) + --dcsToFill; } - - return new ArrayList<InetAddress>(replicas); + return new ArrayList<>(replicas); } - private boolean hasSufficientReplicas(String dc, Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints) + private int sizeOrZero(Multimap<?, ?> collection) { - return dcReplicas.get(dc).size() >= Math.min(allEndpoints.get(dc).size(), getReplicationFactor(dc)); + return collection != null ? collection.asMap().size() : 0; } - private boolean hasSufficientReplicas(Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints) + private int sizeOrZero(Collection<?> collection) { - for (String dc : datacenters.keySet()) - if (!hasSufficientReplicas(dc, dcReplicas, allEndpoints)) - return false; - return true; + return collection != null ? collection.size() : 0; } public int getReplicationFactor() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000da13/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index e65b53e..a3be9de 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -828,20 +828,20 @@ public class TokenMetadata public Token getPredecessor(Token token) { - List tokens = sortedTokens(); + List<Token> tokens = sortedTokens(); int index = Collections.binarySearch(tokens, token); // assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", "); if (index < 0) index = -index-1; - return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1)); + return index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1); } public Token getSuccessor(Token token) { - List tokens = sortedTokens(); + List<Token> tokens = sortedTokens(); int index = Collections.binarySearch(tokens, token); // assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", "); if (index < 0) return (Token) tokens.get(-index-1); - return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1)); + return (index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1); } /** @return a copy of the bootstrapping tokens map */ @@ -902,7 +902,7 @@ public class TokenMetadata } } - public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin) + public static int firstTokenIndex(final ArrayList<Token> ring, Token start, boolean insertMin) { assert ring.size() > 0; // insert the minimum token (at index == -1) if we were asked to include it and it isn't a member of the ring @@ -930,7 +930,7 @@ public class TokenMetadata { if (ring.isEmpty()) return includeMin ? Iterators.singletonIterator(start.getPartitioner().getMinimumToken()) - : Iterators.<Token>emptyIterator(); + : Collections.emptyIterator(); final boolean insertMin = includeMin && !ring.get(0).isMinimum(); final int startIndex = firstTokenIndex(ring, start, insertMin); @@ -1279,5 +1279,14 @@ public class TokenMetadata { return dcRacks; } + + /** + * @return The DC and rack of the given endpoint. + */ + public Pair<String, String> getLocation(InetAddress addr) + { + return currentLocations.get(addr); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000da13/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java index bbfdd3b..3cba328 100644 --- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java @@ -21,24 +21,26 @@ package org.apache.cassandra.locator; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; +import java.util.stream.Collectors; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; + import org.junit.Assert; import org.junit.Test; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.TokenMetadata.Topology; +import org.apache.cassandra.service.StorageService; public class NetworkTopologyStrategyTest { @@ -166,4 +168,203 @@ public class NetworkTopologyStrategyTest InetAddress add1 = InetAddress.getByAddress(bytes); metadata.updateNormalToken(token1, add1); } + + @Test + public void testCalculateEndpoints() throws UnknownHostException + { + final int NODES = 100; + final int VNODES = 64; + final int RUNS = 10; + StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance); + Map<String, Integer> datacenters = ImmutableMap.of("rf1", 1, "rf3", 3, "rf5_1", 5, "rf5_2", 5, "rf5_3", 5); + List<InetAddress> nodes = new ArrayList<>(NODES); + for (byte i=0; i<NODES; ++i) + nodes.add(InetAddress.getByAddress(new byte[]{127, 0, 0, i})); + for (int run=0; run<RUNS; ++run) + { + Random rand = new Random(); + IEndpointSnitch snitch = generateSnitch(datacenters, nodes, rand); + DatabaseDescriptor.setEndpointSnitch(snitch); + + TokenMetadata meta = new TokenMetadata(); + for (int i=0; i<NODES; ++i) // Nodes + for (int j=0; j<VNODES; ++j) // tokens/vnodes per node + meta.updateNormalToken(Murmur3Partitioner.instance.getRandomToken(rand), nodes.get(i)); + testEquivalence(meta, snitch, datacenters, rand); + } + } + + void testEquivalence(TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, Integer> datacenters, Random rand) + { + NetworkTopologyStrategy nts = new NetworkTopologyStrategy("ks", tokenMetadata, snitch, + datacenters.entrySet().stream(). + collect(Collectors.toMap(x -> x.getKey(), x -> Integer.toString(x.getValue())))); + for (int i=0; i<1000; ++i) + { + Token token = Murmur3Partitioner.instance.getRandomToken(rand); + List<InetAddress> expected = calculateNaturalEndpoints(token, tokenMetadata, datacenters, snitch); + List<InetAddress> actual = nts.calculateNaturalEndpoints(token, tokenMetadata); + if (endpointsDiffer(expected, actual)) + { + System.err.println("Endpoints mismatch for token " + token); + System.err.println(" expected: " + expected); + System.err.println(" actual : " + actual); + Assert.assertEquals("Endpoints for token " + token + " mismatch.", expected, actual); + } + } + } + + private boolean endpointsDiffer(List<InetAddress> ep1, List<InetAddress> ep2) + { + // Because the old algorithm does not put the nodes in the correct order in the case where more replicas + // are required than there are racks in a dc, we accept different order as long as the primary + // replica is the same. + if (ep1.equals(ep2)) + return false; + if (!ep1.get(0).equals(ep2.get(0))) + return true; + Set<InetAddress> s1 = new HashSet<>(ep1); + Set<InetAddress> s2 = new HashSet<>(ep2); + return !s1.equals(s2); + } + + IEndpointSnitch generateSnitch(Map<String, Integer> datacenters, Collection<InetAddress> nodes, Random rand) + { + final Map<InetAddress, String> nodeToRack = new HashMap<>(); + final Map<InetAddress, String> nodeToDC = new HashMap<>(); + Map<String, List<String>> racksPerDC = new HashMap<>(); + datacenters.forEach((dc, rf) -> racksPerDC.put(dc, randomRacks(rf, rand))); + int rf = datacenters.values().stream().mapToInt(x -> x).sum(); + String[] dcs = new String[rf]; + int pos = 0; + for (Map.Entry<String, Integer> dce : datacenters.entrySet()) + { + for (int i = 0; i < dce.getValue(); ++i) + dcs[pos++] = dce.getKey(); + } + + for (InetAddress node : nodes) + { + String dc = dcs[rand.nextInt(rf)]; + List<String> racks = racksPerDC.get(dc); + String rack = racks.get(rand.nextInt(racks.size())); + nodeToRack.put(node, rack); + nodeToDC.put(node, dc); + } + + return new AbstractNetworkTopologySnitch() + { + public String getRack(InetAddress endpoint) + { + return nodeToRack.get(endpoint); + } + + public String getDatacenter(InetAddress endpoint) + { + return nodeToDC.get(endpoint); + } + }; + } + + private List<String> randomRacks(int rf, Random rand) + { + int rc = rand.nextInt(rf * 3 - 1) + 1; + List<String> racks = new ArrayList<>(rc); + for (int i=0; i<rc; ++i) + racks.add(Integer.toString(i)); + return racks; + } + + // Copy of older endpoints calculation algorithm for comparison + public static List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, Map<String, Integer> datacenters, IEndpointSnitch snitch) + { + // we want to preserve insertion order so that the first added endpoint becomes primary + Set<InetAddress> replicas = new LinkedHashSet<>(); + // replicas we have found in each DC + Map<String, Set<InetAddress>> dcReplicas = new HashMap<>(datacenters.size()); + for (Map.Entry<String, Integer> dc : datacenters.entrySet()) + dcReplicas.put(dc.getKey(), new HashSet<InetAddress>(dc.getValue())); + + Topology topology = tokenMetadata.getTopology(); + // all endpoints in each DC, so we can check when we have exhausted all the members of a DC + Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints(); + // all racks in a DC so we can check when we have exhausted all racks in a DC + Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks(); + assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members"; + + // tracks the racks we have already placed replicas in + Map<String, Set<String>> seenRacks = new HashMap<>(datacenters.size()); + for (Map.Entry<String, Integer> dc : datacenters.entrySet()) + seenRacks.put(dc.getKey(), new HashSet<String>()); + + // tracks the endpoints that we skipped over while looking for unique racks + // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator + Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<>(datacenters.size()); + for (Map.Entry<String, Integer> dc : datacenters.entrySet()) + skippedDcEndpoints.put(dc.getKey(), new LinkedHashSet<InetAddress>()); + + Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false); + while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints, datacenters)) + { + Token next = tokenIter.next(); + InetAddress ep = tokenMetadata.getEndpoint(next); + String dc = snitch.getDatacenter(ep); + // have we already found all replicas for this dc? + if (!datacenters.containsKey(dc) || hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters)) + continue; + // can we skip checking the rack? + if (seenRacks.get(dc).size() == racks.get(dc).keySet().size()) + { + dcReplicas.get(dc).add(ep); + replicas.add(ep); + } + else + { + String rack = snitch.getRack(ep); + // is this a new rack? + if (seenRacks.get(dc).contains(rack)) + { + skippedDcEndpoints.get(dc).add(ep); + } + else + { + dcReplicas.get(dc).add(ep); + replicas.add(ep); + seenRacks.get(dc).add(rack); + // if we've run out of distinct racks, add the hosts we skipped past already (up to RF) + if (seenRacks.get(dc).size() == racks.get(dc).keySet().size()) + { + Iterator<InetAddress> skippedIt = skippedDcEndpoints.get(dc).iterator(); + while (skippedIt.hasNext() && !hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters)) + { + InetAddress nextSkipped = skippedIt.next(); + dcReplicas.get(dc).add(nextSkipped); + replicas.add(nextSkipped); + } + } + } + } + } + + return new ArrayList<InetAddress>(replicas); + } + + private static boolean hasSufficientReplicas(String dc, Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints, Map<String, Integer> datacenters) + { + return dcReplicas.get(dc).size() >= Math.min(allEndpoints.get(dc).size(), getReplicationFactor(dc, datacenters)); + } + + private static boolean hasSufficientReplicas(Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints, Map<String, Integer> datacenters) + { + for (String dc : datacenters.keySet()) + if (!hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters)) + return false; + return true; + } + + public static int getReplicationFactor(String dc, Map<String, Integer> datacenters) + { + Integer replicas = datacenters.get(dc); + return replicas == null ? 0 : replicas; + } }