This is an automated email from the ASF dual-hosted git repository. snazy pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new bc18b4d Severe concurrency issues in STCS,DTCS,TWCS,TMD.Topology,TypeParser bc18b4d is described below commit bc18b4dd4e33020d0d58c3701077d0af5c39bce6 Author: Robert Stupp <sn...@snazy.de> AuthorDate: Wed Oct 31 12:48:19 2018 +0100 Severe concurrency issues in STCS,DTCS,TWCS,TMD.Topology,TypeParser patch by Robert Stupp; reviewed by Blake Eggleston for CASSANDRA-14781 --- .../org/apache/cassandra/db/ColumnFamilyStore.java | 1 - .../db/compaction/AbstractCompactionStrategy.java | 14 +- .../compaction/DateTieredCompactionStrategy.java | 12 +- .../compaction/SizeTieredCompactionStrategy.java | 8 +- .../compaction/TimeWindowCompactionStrategy.java | 4 +- .../apache/cassandra/db/marshal/TypeParser.java | 30 +- .../cassandra/locator/NetworkTopologyStrategy.java | 3 +- .../apache/cassandra/locator/TokenMetadata.java | 354 ++++++++++++--------- .../cassandra/locator/TokenMetadataTest.java | 23 +- 9 files changed, 269 insertions(+), 180 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 3482909..355d710 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2138,7 +2138,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean assert data.getCompacting().isEmpty() : data.getCompacting(); Iterable<SSTableReader> sstables = getLiveSSTables(); sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables); - sstables = ImmutableList.copyOf(sstables); LifecycleTransaction modifier = data.tryModify(sstables, operationType); assert modifier != null: "something marked things compacting while compactions are disabled"; return modifier; diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 9f07691..2348d19 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -252,15 +252,15 @@ public abstract class AbstractCompactionStrategy * @param originalCandidates The collection to check for blacklisted SSTables * @return list of the SSTables with blacklisted ones filtered out */ - public static Iterable<SSTableReader> filterSuspectSSTables(Iterable<SSTableReader> originalCandidates) + public static List<SSTableReader> filterSuspectSSTables(Iterable<SSTableReader> originalCandidates) { - return Iterables.filter(originalCandidates, new Predicate<SSTableReader>() + List<SSTableReader> filtered = new ArrayList<>(); + for (SSTableReader sstable : originalCandidates) { - public boolean apply(SSTableReader sstable) - { - return !sstable.isMarkedSuspect(); - } - }); + if (!sstable.isMarkedSuspect()) + filtered.add(sstable); + } + return filtered; } diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index 3e6ae61..7c38fa8 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -87,7 +87,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy * @param gcBefore * @return */ - private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) + private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) { if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE))) return Collections.emptyList(); @@ -193,11 +193,6 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy }); } - /** - * - * @param sstables - * @return - */ public static List<Pair<SSTableReader, Long>> createSSTableAndMinTimestampPairs(Iterable<SSTableReader> sstables) { List<Pair<SSTableReader, Long>> sstableMinTimestampPairs = Lists.newArrayListWithCapacity(Iterables.size(sstables)); @@ -205,14 +200,15 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy sstableMinTimestampPairs.add(Pair.create(sstable, sstable.getMinTimestamp())); return sstableMinTimestampPairs; } + @Override - public void addSSTable(SSTableReader sstable) + public synchronized void addSSTable(SSTableReader sstable) { sstables.add(sstable); } @Override - public void removeSSTable(SSTableReader sstable) + public synchronized void removeSSTable(SSTableReader sstable) { sstables.remove(sstable); } diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index f8a8240..80f5e8c 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -74,7 +74,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy this.sizeTieredOptions = new SizeTieredCompactionStrategyOptions(options); } - private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) + private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) { // make local copies so they can't be changed out from under us mid-method int minThreshold = cfs.getMinimumCompactionThreshold(); @@ -190,7 +190,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy } @SuppressWarnings("resource") - public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore, boolean splitOutput) + public synchronized Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore, boolean splitOutput) { Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables); if (Iterables.isEmpty(filteredSSTables)) @@ -316,13 +316,13 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy } @Override - public void addSSTable(SSTableReader added) + public synchronized void addSSTable(SSTableReader added) { sstables.add(added); } @Override - public void removeSSTable(SSTableReader sstable) + public synchronized void removeSSTable(SSTableReader sstable) { sstables.remove(sstable); } diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java index 8d26d0c..c44d3aa 100644 --- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java @@ -169,13 +169,13 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy } @Override - public void addSSTable(SSTableReader sstable) + public synchronized void addSSTable(SSTableReader sstable) { sstables.add(sstable); } @Override - public void removeSSTable(SSTableReader sstable) + public synchronized void removeSSTable(SSTableReader sstable) { sstables.remove(sstable); } diff --git a/src/java/org/apache/cassandra/db/marshal/TypeParser.java b/src/java/org/apache/cassandra/db/marshal/TypeParser.java index 35d15ab..590eea3 100644 --- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java +++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java @@ -23,6 +23,9 @@ import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.*; +import com.google.common.base.Verify; +import com.google.common.collect.ImmutableMap; + import org.apache.cassandra.exceptions.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -37,7 +40,7 @@ public class TypeParser private int idx; // A cache of parsed string, specially useful for DynamicCompositeType - private static final Map<String, AbstractType<?>> cache = new HashMap<>(); + private static volatile ImmutableMap<String, AbstractType<?>> cache = ImmutableMap.of(); public static final TypeParser EMPTY_PARSER = new TypeParser("", 0); @@ -60,6 +63,7 @@ public class TypeParser if (str == null) return BytesType.instance; + // A single volatile read of 'cache' should not hurt. AbstractType<?> type = cache.get(str); if (type != null) @@ -83,9 +87,27 @@ public class TypeParser else type = getAbstractType(name); - // We don't really care about concurrency here. Worst case scenario, we do some parsing unnecessarily - cache.put(str, type); - return type; + Verify.verify(type != null, "Parsing %s yielded null, which is a bug", str); + + // Prevent concurrent modification to the map acting as the cache for TypeParser at the expense of + // more allocation when the cache needs to be updated, since updates to the cache are rare compared + // to the amount of reads. + // + // Copy the existing cache into a new map and add the parsed AbstractType instance and replace + // the cache, if the type is not already in the cache. + // + // The cache-update is done in a short synchronized block to prevent duplicate instances of AbstractType + // for the same string representation. + synchronized (TypeParser.class) + { + if (!cache.containsKey(str)) + { + ImmutableMap.Builder<String, AbstractType<?>> builder = ImmutableMap.builder(); + builder.putAll(cache).put(str, type); + cache = builder.build(); + } + return type; + } } public static AbstractType<?> parse(CharSequence compareWith) throws SyntaxException, ConfigurationException diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java index 7c8d95e..82183bb 100644 --- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java +++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java @@ -29,6 +29,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.TokenMetadata.Topology; import org.apache.cassandra.utils.FBUtilities; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Multimap; /** @@ -91,7 +92,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy // all endpoints in each DC, so we can check when we have exhausted all the members of a DC Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints(); // all racks in a DC so we can check when we have exhausted all racks in a DC - Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks(); + Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks(); assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members"; // tracks the racks we have already placed replicas in diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index b44a1a1..3978eeb 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -95,9 +95,10 @@ public class TokenMetadata /* Use this lock for manipulating the token map */ private final ReadWriteLock lock = new ReentrantReadWriteLock(true); - private volatile ArrayList<Token> sortedTokens; + private volatile ArrayList<Token> sortedTokens; // safe to be read without a lock, as it's never mutated + + private volatile Topology topology; - private final Topology topology; public final IPartitioner partitioner; private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>() @@ -115,7 +116,7 @@ public class TokenMetadata { this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp), HashBiMap.<InetAddress, UUID>create(), - new Topology(), + Topology.empty(), DatabaseDescriptor.getPartitioner()); } @@ -193,6 +194,7 @@ public class TokenMetadata try { boolean shouldSortTokens = false; + Topology.Builder topologyBuilder = topology.unbuild(); for (InetAddress endpoint : endpointTokens.keySet()) { Collection<Token> tokens = endpointTokens.get(endpoint); @@ -201,7 +203,7 @@ public class TokenMetadata bootstrapTokens.removeValue(endpoint); tokenToEndpointMap.removeValue(endpoint); - topology.addEndpoint(endpoint); + topologyBuilder.addEndpoint(endpoint); leavingEndpoints.remove(endpoint); replacementToOriginal.remove(endpoint); removeFromMoving(endpoint); // also removing this endpoint from moving @@ -217,6 +219,7 @@ public class TokenMetadata } } } + topology = topologyBuilder.build(); if (shouldSortTokens) sortedTokens = sortTokens(); @@ -381,12 +384,28 @@ public class TokenMetadata public Optional<InetAddress> getReplacementNode(InetAddress endpoint) { - return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint)); + lock.readLock().lock(); + try + { + return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint)); + } + finally + { + lock.readLock().unlock(); + } } public Optional<InetAddress> getReplacingNode(InetAddress endpoint) { - return Optional.ofNullable((replacementToOriginal.get(endpoint))); + lock.readLock().lock(); + try + { + return Optional.ofNullable((replacementToOriginal.get(endpoint))); + } + finally + { + lock.readLock().unlock(); + } } public void removeBootstrapTokens(Collection<Token> tokens) @@ -430,7 +449,6 @@ public class TokenMetadata assert endpoint != null; lock.writeLock().lock(); - try { movingEndpoints.add(Pair.create(token, endpoint)); @@ -450,7 +468,7 @@ public class TokenMetadata { bootstrapTokens.removeValue(endpoint); tokenToEndpointMap.removeValue(endpoint); - topology.removeEndpoint(endpoint); + topology = topology.unbuild().removeEndpoint(endpoint).build(); leavingEndpoints.remove(endpoint); if (replacementToOriginal.remove(endpoint) != null) { @@ -469,7 +487,7 @@ public class TokenMetadata /** * This is called when the snitch properties for this endpoint are updated, see CASSANDRA-10238. */ - public void updateTopology(InetAddress endpoint) + public Topology updateTopology(InetAddress endpoint) { assert endpoint != null; @@ -477,8 +495,9 @@ public class TokenMetadata try { logger.info("Updating topology for {}", endpoint); - topology.updateEndpoint(endpoint); + topology = topology.unbuild().updateEndpoint(endpoint).build(); invalidateCachedRings(); + return topology; } finally { @@ -490,14 +509,15 @@ public class TokenMetadata * This is called when the snitch properties for many endpoints are updated, it will update * the topology mappings of any endpoints whose snitch has changed, see CASSANDRA-10238. */ - public void updateTopology() + public Topology updateTopology() { lock.writeLock().lock(); try { logger.info("Updating topology for all endpoints that have changed"); - topology.updateEndpoints(); + topology = topology.unbuild().updateEndpoints().build(); invalidateCachedRings(); + return topology; } finally { @@ -590,7 +610,6 @@ public class TokenMetadata assert endpoint != null; lock.readLock().lock(); - try { for (Pair<Token, InetAddress> pair : movingEndpoints) @@ -620,7 +639,7 @@ public class TokenMetadata { return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp), HashBiMap.create(endpointToHostIdMap), - new Topology(topology), + topology, partitioner); } finally @@ -690,7 +709,6 @@ public class TokenMetadata public TokenMetadata cloneAfterAllSettled() { lock.readLock().lock(); - try { TokenMetadata metadata = cloneOnlyTokenMap(); @@ -807,50 +825,49 @@ public class TokenMetadata public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName) { // avoid race between both branches - do not use a lock here as this will block any other unrelated operations! + long startedAt = System.currentTimeMillis(); synchronized (pendingRanges) { - if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) - { - if (logger.isTraceEnabled()) - logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); + // create clone of current state + BiMultiValMap<Token, InetAddress> bootstrapTokensClone; + Set<InetAddress> leavingEndpointsClone; + Set<Pair<Token, InetAddress>> movingEndpointsClone; + TokenMetadata metadata; - pendingRanges.put(keyspaceName, new PendingRangeMaps()); - } - else + lock.readLock().lock(); + try { - if (logger.isDebugEnabled()) - logger.debug("Starting pending range calculation for {}", keyspaceName); - - long startedAt = System.currentTimeMillis(); + if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty()) + { + if (logger.isTraceEnabled()) + logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName); - // create clone of current state - BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>(); - Set<InetAddress> leavingEndpoints = new HashSet<>(); - Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>(); - TokenMetadata metadata; + pendingRanges.put(keyspaceName, new PendingRangeMaps()); - lock.readLock().lock(); - try - { - bootstrapTokens.putAll(this.bootstrapTokens); - leavingEndpoints.addAll(this.leavingEndpoints); - movingEndpoints.addAll(this.movingEndpoints); - metadata = this.cloneOnlyTokenMap(); - } - finally - { - lock.readLock().unlock(); + return; } - pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokens, - leavingEndpoints, movingEndpoints)); - long took = System.currentTimeMillis() - startedAt; - if (logger.isDebugEnabled()) - logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took); - if (logger.isTraceEnabled()) - logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges())); + logger.debug("Starting pending range calculation for {}", keyspaceName); + + bootstrapTokensClone = new BiMultiValMap<>(this.bootstrapTokens); + leavingEndpointsClone = new HashSet<>(this.leavingEndpoints); + movingEndpointsClone = new HashSet<>(this.movingEndpoints); + metadata = this.cloneOnlyTokenMap(); } + finally + { + lock.readLock().unlock(); + } + + pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokensClone, + leavingEndpointsClone, movingEndpointsClone)); + long took = System.currentTimeMillis() - startedAt; + + if (logger.isDebugEnabled()) + logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took); + if (logger.isTraceEnabled()) + logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges())); } } @@ -960,7 +977,7 @@ public class TokenMetadata { List tokens = sortedTokens(); int index = Collections.binarySearch(tokens, token); - assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", "); + assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings(); return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1)); } @@ -968,17 +985,30 @@ public class TokenMetadata { List tokens = sortedTokens(); int index = Collections.binarySearch(tokens, token); - assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", "); + assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings(); return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1)); } + private String tokenToEndpointMapKeysAsStrings() + { + lock.readLock().lock(); + try + { + return StringUtils.join(tokenToEndpointMap.keySet(), ", "); + } + finally + { + lock.readLock().unlock(); + } + } + /** @return a copy of the bootstrapping tokens map */ public BiMultiValMap<Token, InetAddress> getBootstrapTokens() { lock.readLock().lock(); try { - return new BiMultiValMap<Token, InetAddress>(bootstrapTokens); + return new BiMultiValMap<>(bootstrapTokens); } finally { @@ -1103,7 +1133,7 @@ public class TokenMetadata pendingRanges.clear(); movingEndpoints.clear(); sortedTokens.clear(); - topology.clear(); + topology = Topology.empty(); invalidateCachedRings(); } finally @@ -1271,130 +1301,170 @@ public class TokenMetadata public static class Topology { /** multi-map of DC to endpoints in that DC */ - private final Multimap<String, InetAddress> dcEndpoints; + private final ImmutableMultimap<String, InetAddress> dcEndpoints; /** map of DC to multi-map of rack to endpoints in that rack */ - private final Map<String, Multimap<String, InetAddress>> dcRacks; + private final ImmutableMap<String, ImmutableMultimap<String, InetAddress>> dcRacks; /** reverse-lookup map for endpoint to current known dc/rack assignment */ - private final Map<InetAddress, Pair<String, String>> currentLocations; + private final ImmutableMap<InetAddress, Pair<String, String>> currentLocations; - Topology() + private Topology(Builder builder) { - dcEndpoints = HashMultimap.create(); - dcRacks = new HashMap<>(); - currentLocations = new HashMap<>(); - } + this.dcEndpoints = ImmutableMultimap.copyOf(builder.dcEndpoints); - void clear() - { - dcEndpoints.clear(); - dcRacks.clear(); - currentLocations.clear(); + ImmutableMap.Builder<String, ImmutableMultimap<String, InetAddress>> dcRackBuilder = ImmutableMap.builder(); + for (Map.Entry<String, Multimap<String, InetAddress>> entry : builder.dcRacks.entrySet()) + dcRackBuilder.put(entry.getKey(), ImmutableMultimap.copyOf(entry.getValue())); + this.dcRacks = dcRackBuilder.build(); + + this.currentLocations = ImmutableMap.copyOf(builder.currentLocations); } /** - * construct deep-copy of other + * @return multi-map of DC to endpoints in that DC */ - Topology(Topology other) + public Multimap<String, InetAddress> getDatacenterEndpoints() { - dcEndpoints = HashMultimap.create(other.dcEndpoints); - dcRacks = new HashMap<>(); - for (String dc : other.dcRacks.keySet()) - dcRacks.put(dc, HashMultimap.create(other.dcRacks.get(dc))); - currentLocations = new HashMap<>(other.currentLocations); + return dcEndpoints; } /** - * Stores current DC/rack assignment for ep + * @return map of DC to multi-map of rack to endpoints in that rack */ - void addEndpoint(InetAddress ep) + public ImmutableMap<String, ImmutableMultimap<String, InetAddress>> getDatacenterRacks() { - IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - String dc = snitch.getDatacenter(ep); - String rack = snitch.getRack(ep); - Pair<String, String> current = currentLocations.get(ep); - if (current != null) - { - if (current.left.equals(dc) && current.right.equals(rack)) - return; - doRemoveEndpoint(ep, current); - } - - doAddEndpoint(ep, dc, rack); + return dcRacks; } - private void doAddEndpoint(InetAddress ep, String dc, String rack) + Builder unbuild() { - dcEndpoints.put(dc, ep); - - if (!dcRacks.containsKey(dc)) - dcRacks.put(dc, HashMultimap.<String, InetAddress>create()); - dcRacks.get(dc).put(rack, ep); - - currentLocations.put(ep, Pair.create(dc, rack)); + return new Builder(this); } - /** - * Removes current DC/rack assignment for ep - */ - void removeEndpoint(InetAddress ep) + static Builder builder() { - if (!currentLocations.containsKey(ep)) - return; - - doRemoveEndpoint(ep, currentLocations.remove(ep)); + return new Builder(); } - private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current) + static Topology empty() { - dcRacks.get(current.left).remove(current.right, ep); - dcEndpoints.remove(current.left, ep); + return builder().build(); } - void updateEndpoint(InetAddress ep) + private static class Builder { - IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - if (snitch == null || !currentLocations.containsKey(ep)) - return; + /** multi-map of DC to endpoints in that DC */ + private final Multimap<String, InetAddress> dcEndpoints; + /** map of DC to multi-map of rack to endpoints in that rack */ + private final Map<String, Multimap<String, InetAddress>> dcRacks; + /** reverse-lookup map for endpoint to current known dc/rack assignment */ + private final Map<InetAddress, Pair<String, String>> currentLocations; - updateEndpoint(ep, snitch); - } + Builder() + { + this.dcEndpoints = HashMultimap.create(); + this.dcRacks = new HashMap<>(); + this.currentLocations = new HashMap<>(); + } - void updateEndpoints() - { - IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - if (snitch == null) - return; + Builder(Topology from) + { + this.dcEndpoints = HashMultimap.create(from.dcEndpoints); + + this.dcRacks = Maps.newHashMapWithExpectedSize(from.dcRacks.size()); + for (Map.Entry<String, ImmutableMultimap<String, InetAddress>> entry : from.dcRacks.entrySet()) + dcRacks.put(entry.getKey(), HashMultimap.create(entry.getValue())); + + this.currentLocations = new HashMap<>(from.currentLocations); + } + + /** + * Stores current DC/rack assignment for ep + */ + Builder addEndpoint(InetAddress ep) + { + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + String dc = snitch.getDatacenter(ep); + String rack = snitch.getRack(ep); + Pair<String, String> current = currentLocations.get(ep); + if (current != null) + { + if (current.left.equals(dc) && current.right.equals(rack)) + return this; + doRemoveEndpoint(ep, current); + } + + doAddEndpoint(ep, dc, rack); + return this; + } + + private void doAddEndpoint(InetAddress ep, String dc, String rack) + { + dcEndpoints.put(dc, ep); + + if (!dcRacks.containsKey(dc)) + dcRacks.put(dc, HashMultimap.<String, InetAddress>create()); + dcRacks.get(dc).put(rack, ep); + + currentLocations.put(ep, Pair.create(dc, rack)); + } + + /** + * Removes current DC/rack assignment for ep + */ + Builder removeEndpoint(InetAddress ep) + { + if (!currentLocations.containsKey(ep)) + return this; + + doRemoveEndpoint(ep, currentLocations.remove(ep)); + return this; + } + + private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current) + { + dcRacks.get(current.left).remove(current.right, ep); + dcEndpoints.remove(current.left, ep); + } + + Builder updateEndpoint(InetAddress ep) + { + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + if (snitch == null || !currentLocations.containsKey(ep)) + return this; - for (InetAddress ep : currentLocations.keySet()) updateEndpoint(ep, snitch); - } + return this; + } - private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch) - { - Pair<String, String> current = currentLocations.get(ep); - String dc = snitch.getDatacenter(ep); - String rack = snitch.getRack(ep); - if (dc.equals(current.left) && rack.equals(current.right)) - return; + Builder updateEndpoints() + { + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + if (snitch == null) + return this; - doRemoveEndpoint(ep, current); - doAddEndpoint(ep, dc, rack); - } + for (InetAddress ep : currentLocations.keySet()) + updateEndpoint(ep, snitch); - /** - * @return multi-map of DC to endpoints in that DC - */ - public Multimap<String, InetAddress> getDatacenterEndpoints() - { - return dcEndpoints; - } + return this; + } - /** - * @return map of DC to multi-map of rack to endpoints in that rack - */ - public Map<String, Multimap<String, InetAddress>> getDatacenterRacks() - { - return dcRacks; + private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch) + { + Pair<String, String> current = currentLocations.get(ep); + String dc = snitch.getDatacenter(ep); + String rack = snitch.getRack(ep); + if (dc.equals(current.left) && rack.equals(current.right)) + return; + + doRemoveEndpoint(ep, current); + doAddEndpoint(ep, dc, rack); + } + + Topology build() + { + return new Topology(this); + } } + } } diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java index e7bb70a..dab7082 100644 --- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java +++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java @@ -22,6 +22,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Map; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Iterators; import com.google.common.collect.Multimap; @@ -29,18 +30,18 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; -import static junit.framework.Assert.assertNotNull; -import static org.junit.Assert.assertEquals; - -import static org.apache.cassandra.Util.token; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Token; import org.apache.cassandra.service.StorageService; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import static org.apache.cassandra.Util.token; + @RunWith(OrderedJUnit4ClassRunner.class) public class TokenMetadataTest @@ -139,7 +140,7 @@ public class TokenMetadataTest assertTrue(allEndpoints.get(DATA_CENTER).contains(first)); assertTrue(allEndpoints.get(DATA_CENTER).contains(second)); - Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks(); + Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks(); assertNotNull(racks); assertTrue(racks.size() == 1); assertTrue(racks.containsKey(DATA_CENTER)); @@ -171,7 +172,7 @@ public class TokenMetadataTest }); tokenMetadata.updateTopology(first); - tokenMetadata.updateTopology(second); + topology = tokenMetadata.updateTopology(second); allEndpoints = topology.getDatacenterEndpoints(); assertNotNull(allEndpoints); @@ -237,7 +238,7 @@ public class TokenMetadataTest assertTrue(allEndpoints.get(DATA_CENTER).contains(first)); assertTrue(allEndpoints.get(DATA_CENTER).contains(second)); - Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks(); + Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks(); assertNotNull(racks); assertTrue(racks.size() == 1); assertTrue(racks.containsKey(DATA_CENTER)); @@ -268,7 +269,7 @@ public class TokenMetadataTest } }); - tokenMetadata.updateTopology(); + topology = tokenMetadata.updateTopology(); allEndpoints = topology.getDatacenterEndpoints(); assertNotNull(allEndpoints); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org