Author: jbellis Date: Mon Jun 14 21:53:09 2010 New Revision: 954657 URL: http://svn.apache.org/viewvc?rev=954657&view=rev Log: Fix bootstrap with DSS and add endpoint caching. patch by mdennis and jbellis for CASSANDRA-1147
Added: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/test/conf/cassandra-rack.properties cassandra/trunk/test/conf/datacenters.properties cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Added: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=954657&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Mon Jun 14 21:53:09 2010 @@ -0,0 +1,45 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.cassandra.locator; + +import java.net.InetAddress; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public abstract class AbstractEndpointSnitch implements IEndpointSnitch +{ + /* list of subscribers that are notified when cached values from this snitch are invalidated */ + protected List<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>(); + + public void register(AbstractReplicationStrategy subscriber) + { + subscribers.add(subscriber); + } + + protected void invalidateCachedSnitchValues() + { + for (AbstractReplicationStrategy subscriber : subscribers) + subscriber.invalidateCachedSnitchValues(); + } + + public abstract List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress); + public abstract List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses); +} Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java Mon Jun 14 21:53:09 2010 @@ -1,4 +1,3 @@ -package org.apache.cassandra.locator; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,6 +19,7 @@ package org.apache.cassandra.locator; * */ +package org.apache.cassandra.locator; import java.net.InetAddress; import java.net.UnknownHostException; @@ -29,7 +29,7 @@ import java.util.*; * An endpoint snitch tells Cassandra information about network topology that it can use to route * requests more efficiently. */ -public abstract class AbstractRackAwareSnitch implements IEndpointSnitch +public abstract class AbstractRackAwareSnitch extends AbstractEndpointSnitch { /** * Return the rack for which an endpoint resides in Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Mon Jun 14 21:53:09 2010 @@ -16,57 +16,79 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.cassandra.locator; import java.net.InetAddress; import java.util.*; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.service.AbstractWriteResponseHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.service.AbstractWriteResponseHandler; import org.apache.cassandra.service.IResponseResolver; import org.apache.cassandra.service.QuorumResponseHandler; import org.apache.cassandra.service.WriteResponseHandler; import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.thrift.UnavailableException; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; +import org.cliffc.high_scale_lib.NonBlockingHashMap; /** * A abstract parent for all replication strategies. */ public abstract class AbstractReplicationStrategy { - protected static final Logger logger_ = LoggerFactory.getLogger(AbstractReplicationStrategy.class); + private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class); - protected TokenMetadata tokenMetadata_; - protected final IEndpointSnitch snitch_; + private TokenMetadata tokenMetadata; + protected final IEndpointSnitch snitch; + private final Map<EndpointCacheKey, ArrayList<InetAddress>> cachedEndpoints; AbstractReplicationStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch) { - tokenMetadata_ = tokenMetadata; - snitch_ = snitch; + this.tokenMetadata = tokenMetadata; + this.snitch = snitch; + cachedEndpoints = new NonBlockingHashMap<EndpointCacheKey, ArrayList<InetAddress>>(); + this.tokenMetadata.register(this); + if (this.snitch != null) + this.snitch.register(this); } /** - * get the endpoints that should store the given Token, for the given table. + * get the (possibly cached) endpoints that should store the given Token, for the given table. * Note that while the endpoints are conceptually a Set (no duplicates will be included), - * we return a List to avoid an extra allocation when sorting by proximity later. + * we return a List to avoid an extra allocation when sorting by proximity later + * @param searchToken the token the natural endpoints are requested for + * @param table the table the natural endpoints are requested for + * @return a copy of the natural endpoints for the given token and table */ - public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table); - - public ArrayList<InetAddress> getNaturalEndpoints(Token token, String table) + public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, String table) { - return getNaturalEndpoints(token, tokenMetadata_, table); + // TODO creating a iterator object just to get the closest token is wasteful -- we do in multiple places w/ ringIterator + Token keyToken = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken).next(); + EndpointCacheKey cacheKey = new EndpointCacheKey(table, keyToken); + ArrayList<InetAddress> endpoints = cachedEndpoints.get(cacheKey); + if (endpoints == null) + { + TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap(); + keyToken = TokenMetadata.ringIterator(tokenMetadataClone.sortedTokens(), searchToken).next(); + cacheKey = new EndpointCacheKey(table, keyToken); + endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone, table)); + cachedEndpoints.put(cacheKey, endpoints); + } + + return new ArrayList<InetAddress>(endpoints); } + public abstract Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table); + public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, @@ -132,12 +154,12 @@ public abstract class AbstractReplicatio */ public Collection<InetAddress> getWriteEndpoints(Token token, String table, Collection<InetAddress> naturalEndpoints) { - if (tokenMetadata_.getPendingRanges(table).isEmpty()) + if (tokenMetadata.getPendingRanges(table).isEmpty()) return naturalEndpoints; List<InetAddress> endpoints = new ArrayList<InetAddress>(naturalEndpoints); - for (Map.Entry<Range, Collection<InetAddress>> entry : tokenMetadata_.getPendingRanges(table).entrySet()) + for (Map.Entry<Range, Collection<InetAddress>> entry : tokenMetadata.getPendingRanges(table).entrySet()) { if (entry.getKey().contains(token)) { @@ -161,7 +183,7 @@ public abstract class AbstractReplicatio for (Token token : metadata.sortedTokens()) { Range range = metadata.getPrimaryRangeFor(token); - for (InetAddress ep : getNaturalEndpoints(token, metadata, table)) + for (InetAddress ep : calculateNaturalEndpoints(token, metadata, table)) { map.put(ep, range); } @@ -177,7 +199,7 @@ public abstract class AbstractReplicatio for (Token token : metadata.sortedTokens()) { Range range = metadata.getPrimaryRangeFor(token); - for (InetAddress ep : getNaturalEndpoints(token, metadata, table)) + for (InetAddress ep : calculateNaturalEndpoints(token, metadata, table)) { map.put(range, ep); } @@ -188,7 +210,7 @@ public abstract class AbstractReplicatio public Multimap<InetAddress, Range> getAddressRanges(String table) { - return getAddressRanges(tokenMetadata_, table); + return getAddressRanges(tokenMetadata, table); } public Collection<Range> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress, String table) @@ -202,4 +224,25 @@ public abstract class AbstractReplicatio { return new QuorumResponseHandler(responseResolver, consistencyLevel, table); } + + protected static class EndpointCacheKey extends Pair<String, Token> + { + public EndpointCacheKey(String table, Token keyToken) {super(table, keyToken);} + } + + protected void clearCachedEndpoints() + { + logger.debug("clearing cached endpoints"); + cachedEndpoints.clear(); + } + + public void invalidateCachedTokenEndpointValues() + { + clearCachedEndpoints(); + } + + public void invalidateCachedSnitchValues() + { + clearCachedEndpoints(); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java Mon Jun 14 21:53:09 2010 @@ -1,5 +1,3 @@ -package org.apache.cassandra.locator; - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -21,19 +19,21 @@ package org.apache.cassandra.locator; * */ +package org.apache.cassandra.locator; import java.net.InetAddress; import java.util.*; import java.util.Map.Entry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.collect.Multimap; import org.apache.cassandra.config.ConfigurationException; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.utils.ResourceWatcher; import org.apache.cassandra.service.*; import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.ResourceWatcher; import org.apache.cassandra.utils.WrappedRunnable; /** @@ -52,9 +52,9 @@ import org.apache.cassandra.utils.Wrappe public class DatacenterShardStrategy extends AbstractReplicationStrategy { private static final String DATACENTER_PROPERTY_FILENAME = "datacenters.properties"; - private Map<String, List<Token>> dcTokens; private AbstractRackAwareSnitch snitch; - private Map<String, Map<String, Integer>> datacenters = new HashMap<String, Map<String, Integer>>(); + private volatile Map<String, Map<String, Integer>> datacenters; + private static final Logger logger = LoggerFactory.getLogger(DatacenterShardStrategy.class); public DatacenterShardStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch) throws ConfigurationException { @@ -72,108 +72,88 @@ public class DatacenterShardStrategy ext } }; ResourceWatcher.watch(DATACENTER_PROPERTY_FILENAME, runnable, 60 * 1000); - - loadEndpoints(tokenMetadata); } - public void reloadConfiguration() throws ConfigurationException + public synchronized void reloadConfiguration() throws ConfigurationException { Properties props = PropertyFileSnitch.resourceToProperties(DATACENTER_PROPERTY_FILENAME); - for (Object key : props.keySet()) + Map<String, Map<String, Integer>> newDatacenters = new HashMap<String, Map<String, Integer>>(); + for (Entry entry : props.entrySet()) { - String[] keys = ((String)key).split(":"); - Map<String, Integer> map = datacenters.get(keys[0]); - if (null == map) - { + String[] keys = ((String)entry.getKey()).split(":"); + Map<String, Integer> map = newDatacenters.get(keys[0]); + if (map == null) map = new HashMap<String, Integer>(); - } - map.put(keys[1], Integer.parseInt((String)props.get(key))); - datacenters.put(keys[0], map); + map.put(keys[1], Integer.parseInt((String) entry.getValue())); + newDatacenters.put(keys[0], map); } - } - - private synchronized void loadEndpoints(TokenMetadata metadata) throws ConfigurationException - { - String localDC = snitch.getDatacenter(DatabaseDescriptor.getListenAddress()); - if (localDC == null) - throw new ConfigurationException("Invalid datacenter configuration; couldn't find local host " + FBUtilities.getLocalAddress()); - - dcTokens = new HashMap<String, List<Token>>(); - for (Token token : metadata.sortedTokens()) - { - InetAddress endPoint = metadata.getEndpoint(token); - String dataCenter = snitch.getDatacenter(endPoint); - // add tokens to dcmap. - List<Token> lst = dcTokens.get(dataCenter); - if (lst == null) + datacenters = Collections.unmodifiableMap(newDatacenters); + logger.info(DATACENTER_PROPERTY_FILENAME + " changed, clearing endpoint cache"); + clearCachedEndpoints(); + } + + public Set<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, String table) + { + int totalReplicas = getReplicationfactor(table); + Map<String, Integer> remainingReplicas = new HashMap<String, Integer>(datacenters.get(table)); + Map<String, Set<String>> dcUsedRacks = new HashMap<String, Set<String>>(); + Set<InetAddress> endpoints = new HashSet<InetAddress>(totalReplicas); + + // first pass: only collect replicas on unique racks + for (Iterator<Token> iter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken); + endpoints.size() < totalReplicas && iter.hasNext();) + { + Token token = iter.next(); + InetAddress endpoint = tokenMetadata.getEndpoint(token); + String datacenter = snitch.getDatacenter(endpoint); + int remaining = remainingReplicas.containsKey(datacenter) ? remainingReplicas.get(datacenter) : 0; + if (remaining > 0) { - lst = new ArrayList<Token>(); - } - lst.add(token); - dcTokens.put(dataCenter, lst); - } - for (Entry<String, List<Token>> entry : dcTokens.entrySet()) - { - List<Token> valueList = entry.getValue(); - Collections.sort(valueList); - dcTokens.put(entry.getKey(), valueList); - } - - // TODO verify that each DC has enough endpoints for the desired RF - } - - public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, TokenMetadata metadata, String table) - { - ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(); - - if (metadata.sortedTokens().isEmpty()) - return endpoints; - - for (String dc : dcTokens.keySet()) - { - List<Token> tokens = dcTokens.get(dc); - Set<String> racks = new HashSet<String>(); - // Add the node at the index by default - Iterator<Token> iter = TokenMetadata.ringIterator(tokens, searchToken); - InetAddress initialDCHost = metadata.getEndpoint(iter.next()); - assert initialDCHost != null; - endpoints.add(initialDCHost); - racks.add(snitch.getRack(initialDCHost)); - - // find replicas on unique racks - int replicas = getReplicationFactor(dc, table); - int localEndpoints = 1; - while (localEndpoints < replicas && iter.hasNext()) - { - Token t = iter.next(); - InetAddress endpoint = metadata.getEndpoint(t); - if (!racks.contains(snitch.getRack(endpoint))) + Set<String> usedRacks = dcUsedRacks.get(datacenter); + if (usedRacks == null) + { + usedRacks = new HashSet<String>(); + dcUsedRacks.put(datacenter, usedRacks); + } + String rack = snitch.getRack(endpoint); + if (!usedRacks.contains(rack)) { endpoints.add(endpoint); - localEndpoints++; + usedRacks.add(rack); + remainingReplicas.put(datacenter, remaining - 1); } } + } - if (localEndpoints == replicas) + // 2nd pass: if replica count has not been achieved from unique racks, add nodes from the same racks + for (Iterator<Token> iter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken); + endpoints.size() < totalReplicas && iter.hasNext();) + { + Token token = iter.next(); + InetAddress endpoint = tokenMetadata.getEndpoint(token); + if (endpoints.contains(endpoint)) continue; - // if not enough unique racks were found, re-loop and add other endpoints - iter = TokenMetadata.ringIterator(tokens, searchToken); - iter.next(); // skip the first one since we already know it's used - while (localEndpoints < replicas && iter.hasNext()) + String datacenter = snitch.getDatacenter(endpoint); + int remaining = remainingReplicas.containsKey(datacenter) ? remainingReplicas.get(datacenter) : 0; + if (remaining > 0) { - Token t = iter.next(); - if (!endpoints.contains(metadata.getEndpoint(t))) - { - localEndpoints++; - endpoints.add(metadata.getEndpoint(t)); - } + endpoints.add(endpoint); + remainingReplicas.put(datacenter, remaining - 1); } } return endpoints; } + public int getReplicationfactor(String table) + { + int total = 0; + for (int repFactor : datacenters.get(table).values()) + total += repFactor; + return total; + } + public int getReplicationFactor(String dc, String table) { return datacenters.get(table).get(dc); Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Mon Jun 14 21:53:09 2010 @@ -18,13 +18,9 @@ package org.apache.cassandra.locator; -import java.net.UnknownHostException; - import java.net.InetAddress; -import java.util.Set; -import java.util.List; import java.util.Collection; - +import java.util.List; /** * This interface helps determine location of node in the data center relative to another node. @@ -43,5 +39,10 @@ public interface IEndpointSnitch * This method will sort the <tt>List</tt> by proximity to the given address. */ public List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses); -} + /** + * register to receive notification when the endpoint snitch has changed the answers it was providing. + * @param subscriber the subscriber to notify + */ + public void register(AbstractReplicationStrategy subscriber); +} Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java Mon Jun 14 21:53:09 2010 @@ -24,23 +24,25 @@ import java.net.InetAddress; import java.util.Properties; import java.util.StringTokenizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ResourceWatcher; import org.apache.cassandra.utils.WrappedRunnable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Used to determine if two IP's are in the same datacenter or on the same rack. * <p/> * Based on a properties file configuration. */ -public class PropertyFileSnitch extends AbstractRackAwareSnitch { +public class PropertyFileSnitch extends AbstractRackAwareSnitch +{ /** * A list of properties with keys being host:port and values being datacenter:rack */ - private volatile Properties hostProperties = new Properties(); + private volatile Properties hostProperties; /** * The default rack property file to be read. @@ -112,6 +114,7 @@ public class PropertyFileSnitch extends public void reloadConfiguration() throws ConfigurationException { hostProperties = resourceToProperties(RACK_PROPERTY_FILENAME); + invalidateCachedSnitchValues(); } public static Properties resourceToProperties(String filename) throws ConfigurationException Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Mon Jun 14 21:53:09 2010 @@ -16,16 +16,14 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.cassandra.locator; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; +import java.net.InetAddress; +import java.util.*; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Token; -import java.net.InetAddress; /* * This Replication Strategy returns the nodes responsible for a given @@ -43,10 +41,10 @@ public class RackAwareStrategy extends A throw new IllegalArgumentException(("RackAwareStrategy requires AbstractRackAwareSnitch.")); } - public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table) + public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table) { int replicas = DatabaseDescriptor.getReplicationFactor(table); - ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(replicas); + Set<InetAddress> endpoints = new HashSet<InetAddress>(replicas); List<Token> tokens = metadata.sortedTokens(); if (tokens.isEmpty()) @@ -60,7 +58,7 @@ public class RackAwareStrategy extends A boolean bOtherRack = false; while (endpoints.size() < replicas && iter.hasNext()) { - AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch)snitch_; + AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch) this.snitch; // First try to find one in a different data center Token t = iter.next(); Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Mon Jun 14 21:53:09 2010 @@ -16,15 +16,14 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.cassandra.locator; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; +import java.net.InetAddress; +import java.util.*; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Token; -import java.net.InetAddress; /** * This class returns the nodes responsible for a given @@ -39,11 +38,11 @@ public class RackUnawareStrategy extends super(tokenMetadata, snitch); } - public ArrayList<InetAddress> getNaturalEndpoints(Token token, TokenMetadata metadata, String table) + public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table) { int replicas = DatabaseDescriptor.getReplicationFactor(table); List<Token> tokens = metadata.sortedTokens(); - ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(replicas); + Set<InetAddress> endpoints = new HashSet<InetAddress>(replicas); if (tokens.isEmpty()) return endpoints; Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java Mon Jun 14 21:53:09 2010 @@ -19,18 +19,19 @@ package org.apache.cassandra.locator; import java.net.InetAddress; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; /** * A simple endpoint snitch implementation does not sort addresses by * proximity. */ -public class SimpleSnitch implements IEndpointSnitch +public class SimpleSnitch extends AbstractEndpointSnitch { public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses) { - List<InetAddress> list = new ArrayList<InetAddress>(addresses); - return list; + return new ArrayList<InetAddress>(addresses); } public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses) Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Mon Jun 14 21:53:09 2010 @@ -18,23 +18,23 @@ package org.apache.cassandra.locator; +import java.net.InetAddress; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.common.collect.*; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.dht.Range; - -import java.net.InetAddress; - import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.*; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + public class TokenMetadata { private static Logger logger = LoggerFactory.getLogger(TokenMetadata.class); @@ -64,6 +64,9 @@ public class TokenMetadata private final ReadWriteLock lock = new ReentrantReadWriteLock(true); private List<Token> sortedTokens; + /* list of subscribers that are notified when the tokenToEndpointMap changed */ + private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers; + public TokenMetadata() { this(null); @@ -78,6 +81,7 @@ public class TokenMetadata leavingEndpoints = new HashSet<InetAddress>(); pendingRanges = new ConcurrentHashMap<String, Multimap<Range, InetAddress>>(); sortedTokens = sortTokens(); + subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>(); } private List<Token> sortTokens() @@ -116,6 +120,7 @@ public class TokenMetadata sortedTokens = sortTokens(); } leavingEndpoints.remove(endpoint); + fireTokenToEndpointMapChanged(); } finally { @@ -205,6 +210,7 @@ public class TokenMetadata tokenToEndpointMap.inverse().remove(endpoint); leavingEndpoints.remove(endpoint); sortedTokens = sortTokens(); + fireTokenToEndpointMapChanged(); } finally { @@ -454,6 +460,7 @@ public class TokenMetadata tokenToEndpointMap.clear(); leavingEndpoints.clear(); pendingRanges.clear(); + fireTokenToEndpointMapChanged(); } public String toString() @@ -529,4 +536,17 @@ public class TokenMetadata return sb.toString(); } + + protected void fireTokenToEndpointMapChanged() + { + for (AbstractReplicationStrategy subscriber : subscribers) + { + subscriber.invalidateCachedTokenEndpointValues(); + } + } + + public void register(AbstractReplicationStrategy subscriber) + { + subscribers.add(subscriber); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Jun 14 21:53:09 2010 @@ -22,20 +22,24 @@ import java.io.IOError; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.reflect.Constructor; +import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.net.InetAddress; -import javax.management.*; +import java.util.concurrent.*; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.commons.lang.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; -import org.apache.cassandra.concurrent.*; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; @@ -43,27 +47,25 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.migration.AddKeyspace; import org.apache.cassandra.db.migration.Migration; -import org.apache.cassandra.dht.*; +import org.apache.cassandra.dht.BootStrapper; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; import org.apache.cassandra.io.DeletionService; import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.locator.*; -import org.apache.cassandra.net.*; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.ResponseVerbHandler; import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler; import org.apache.cassandra.streaming.*; import org.apache.cassandra.thrift.UnavailableException; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; -import org.apache.cassandra.io.util.FileUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.log4j.Level; -import org.apache.commons.lang.StringUtils; - -import com.google.common.collect.Multimap; -import com.google.common.collect.HashMultimap; /* * This abstraction contains the token/identifier of this node @@ -764,8 +766,8 @@ public class StorageService implements I // all leaving nodes are gone. for (Range range : affectedRanges) { - List<InetAddress> currentEndpoints = strategy.getNaturalEndpoints(range.right, tm, table); - List<InetAddress> newEndpoints = strategy.getNaturalEndpoints(range.right, allLeftMetadata, table); + Set<InetAddress> currentEndpoints = strategy.calculateNaturalEndpoints(range.right, tm, table); + Set<InetAddress> newEndpoints = strategy.calculateNaturalEndpoints(range.right, allLeftMetadata, table); newEndpoints.removeAll(currentEndpoints); pendingRanges.putAll(range, newEndpoints); } @@ -870,11 +872,11 @@ public class StorageService implements I if (logger_.isDebugEnabled()) logger_.debug("Node " + endpoint + " ranges [" + StringUtils.join(ranges, ", ") + "]"); - Map<Range, ArrayList<InetAddress>> currentReplicaEndpoints = new HashMap<Range, ArrayList<InetAddress>>(); + Map<Range, Set<InetAddress>> currentReplicaEndpoints = new HashMap<Range, Set<InetAddress>>(); // Find (for each range) all nodes that store replicas for these ranges as well for (Range range : ranges) - currentReplicaEndpoints.put(range, getReplicationStrategy(table).getNaturalEndpoints(range.right, tokenMetadata_, table)); + currentReplicaEndpoints.put(range, getReplicationStrategy(table).calculateNaturalEndpoints(range.right, tokenMetadata_, table)); TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft(); @@ -892,7 +894,7 @@ public class StorageService implements I // range. for (Range range : ranges) { - ArrayList<InetAddress> newReplicaEndpoints = getReplicationStrategy(table).getNaturalEndpoints(range.right, temp, table); + Set<InetAddress> newReplicaEndpoints = getReplicationStrategy(table).calculateNaturalEndpoints(range.right, temp, table); newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range)); if (logger_.isDebugEnabled()) if (newReplicaEndpoints.isEmpty()) Modified: cassandra/trunk/test/conf/cassandra-rack.properties URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra-rack.properties?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/test/conf/cassandra-rack.properties (original) +++ cassandra/trunk/test/conf/cassandra-rack.properties Mon Jun 14 21:53:09 2010 @@ -32,5 +32,11 @@ 10.21.119.14=DC3:RAC2 10.20.114.15=DC2:RAC2 +127.0.0.1=DC1:RAC1 +127.0.0.2=DC2:RAC2 +127.0.0.3=DC3:RAC3 +127.0.0.4=DC4:RAC4 +127.0.0.5=DC3:RAC3 + # default for unknown nodes -default=DC1:r1 \ No newline at end of file +default=DC1:r1 Modified: cassandra/trunk/test/conf/datacenters.properties URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/datacenters.properties?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/test/conf/datacenters.properties (original) +++ cassandra/trunk/test/conf/datacenters.properties Mon Jun 14 21:53:09 2010 @@ -20,4 +20,7 @@ # keyspace\:datacenter=replication factor Keyspace1\:DC1=3 Keyspace1\:DC2=2 -Keyspace1\:DC3=1 \ No newline at end of file +Keyspace1\:DC3=1 +Keyspace3\:DC1=3 +Keyspace3\:DC2=2 +Keyspace3\:DC3=1 \ No newline at end of file Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java Mon Jun 14 21:53:09 2010 @@ -1,3 +1,22 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + package org.apache.cassandra.locator; import java.io.IOException; @@ -5,15 +24,13 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashSet; - import javax.xml.parsers.ParserConfigurationException; +import org.junit.Test; + import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.dht.StringToken; import org.apache.cassandra.dht.Token; - -import org.junit.Test; - import org.xml.sax.SAXException; public class DatacenterShardStrategyTest Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/locator/RackAwareStrategyTest.java Mon Jun 14 21:53:09 2010 @@ -16,9 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.cassandra.locator; -import static org.junit.Assert.assertEquals; +package org.apache.cassandra.locator; import java.net.InetAddress; import java.net.UnknownHostException; @@ -27,13 +26,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.BigIntegerToken; import org.apache.cassandra.dht.Token; -import org.junit.Before; -import org.junit.Test; - public class RackAwareStrategyTest { private List<Token> endpointTokens; @@ -160,7 +160,7 @@ public class RackAwareStrategyTest { for (Token keyToken : keyTokens) { - List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyToken, tmd, table); + List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyToken, table); for (int j = 0; j < endpoints.size(); j++) { ArrayList<InetAddress> hostsExpected = expectedResults.get(keyToken.toString()); Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=954657&r1=954656&r2=954657&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Mon Jun 14 21:53:09 2010 @@ -18,28 +18,21 @@ */ package org.apache.cassandra.locator; -import static org.junit.Assert.*; - -import java.util.Arrays; -import java.util.List; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import org.junit.Test; +import static org.junit.Assert.*; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.gms.ApplicationState; -import org.apache.cassandra.service.StorageServiceAccessor; -import org.junit.Test; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.RandomPartitioner; -import org.apache.cassandra.dht.BigIntegerToken; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.dht.OrderPreservingPartitioner; -import org.apache.cassandra.dht.StringToken; +import org.apache.cassandra.dht.*; import org.apache.cassandra.service.StorageService; - -import java.net.InetAddress; -import java.net.UnknownHostException; +import org.apache.cassandra.service.StorageServiceAccessor; public class RackUnawareStrategyTest extends SchemaLoader { @@ -71,8 +64,7 @@ public class RackUnawareStrategyTest ext endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i))); keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5))); } - for (String table : DatabaseDescriptor.getNonSystemTables()) - testGetEndpoints(tmd, strategy, endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]), table); + verifyGetNaturalEndpoints(tmd, strategy, endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0])); } @Test @@ -88,29 +80,31 @@ public class RackUnawareStrategyTest ext endpointTokens.add(new StringToken(String.valueOf((char)('a' + i * 2)))); keyTokens.add(partitioner.getToken(String.valueOf((char)('a' + i * 2 + 1)).getBytes())); } - for (String table : DatabaseDescriptor.getNonSystemTables()) - testGetEndpoints(tmd, strategy, endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0]), table); + verifyGetNaturalEndpoints(tmd, strategy, endpointTokens.toArray(new Token[0]), keyTokens.toArray(new Token[0])); } // given a list of endpoint tokens, and a set of key tokens falling between the endpoint tokens, // make sure that the Strategy picks the right endpoints for the keys. - private void testGetEndpoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endpointTokens, Token[] keyTokens, String table) throws UnknownHostException + private void verifyGetNaturalEndpoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endpointTokens, Token[] keyTokens) throws UnknownHostException { - List<InetAddress> hosts = new ArrayList<InetAddress>(); - for (int i = 0; i < endpointTokens.length; i++) + for (String table : DatabaseDescriptor.getNonSystemTables()) { - InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1)); - tmd.updateNormalToken(endpointTokens[i], ep); - hosts.add(ep); - } + List<InetAddress> hosts = new ArrayList<InetAddress>(); + for (int i = 0; i < endpointTokens.length; i++) + { + InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1)); + tmd.updateNormalToken(endpointTokens[i], ep); + hosts.add(ep); + } - for (int i = 0; i < keyTokens.length; i++) - { - List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyTokens[i], table); - assertEquals(DatabaseDescriptor.getReplicationFactor(table), endpoints.size()); - for (int j = 0; j < endpoints.size(); j++) + for (int i = 0; i < keyTokens.length; i++) { - assertEquals(endpoints.get(j), hosts.get((i + j + 1) % hosts.size())); + List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyTokens[i], table); + assertEquals(DatabaseDescriptor.getReplicationFactor(table), endpoints.size()); + List<InetAddress> correctEndpoints = new ArrayList<InetAddress>(); + for (int j = 0; j < endpoints.size(); j++) + correctEndpoints.add(hosts.get((i + j + 1) % hosts.size())); + assertEquals(new HashSet<InetAddress>(correctEndpoints), new HashSet<InetAddress>(endpoints)); } } } Added: cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java?rev=954657&view=auto ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java (added) +++ cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java Mon Jun 14 21:53:09 2010 @@ -0,0 +1,156 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.cassandra.locator; + +import java.lang.reflect.Constructor; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.dht.BigIntegerToken; +import org.apache.cassandra.dht.Token; + +public class ReplicationStrategyEndpointCacheTest extends SchemaLoader +{ + private TokenMetadata tmd; + private Token searchToken; + private AbstractReplicationStrategy strategy; + + public void setup(Class stratClass) throws Exception + { + tmd = new TokenMetadata(); + searchToken = new BigIntegerToken(String.valueOf(15)); + Constructor constructor = stratClass.getConstructor(TokenMetadata.class, IEndpointSnitch.class); + strategy = (AbstractReplicationStrategy) constructor.newInstance(tmd, new PropertyFileSnitch()); + + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddress.getByName("127.0.0.1")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddress.getByName("127.0.0.2")); + } + + @Test + public void testEndpointsWereCached() throws Exception + { + runEndpointsWereCachedTest(FakeRackUnawareStrategy.class); + runEndpointsWereCachedTest(FakeRackAwareStrategy.class); + runEndpointsWereCachedTest(FakeDatacenterShardStrategy.class); + } + + public void runEndpointsWereCachedTest(Class stratClass) throws Exception + { + setup(stratClass); + assert strategy.getNaturalEndpoints(searchToken, "Keyspace3").equals(strategy.getNaturalEndpoints(searchToken, "Keyspace3")); + } + + @Test + public void testCacheRespectsTokenChanges() throws Exception + { + runCacheRespectsTokenChangesTest(RackUnawareStrategy.class); + runCacheRespectsTokenChangesTest(RackAwareStrategy.class); + runCacheRespectsTokenChangesTest(DatacenterShardStrategy.class); + } + + public void runCacheRespectsTokenChangesTest(Class stratClass) throws Exception + { + // TODO DSS is asked to provide a total of 6 replicas, but we never give it 6 endpoints. + // thus we are testing undefined behavior, at best. + setup(stratClass); + ArrayList<InetAddress> endpoints; + + endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3"); + assert endpoints.size() == 2 : StringUtils.join(endpoints, ","); + + // test token addition + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.3")); + endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3"); + assert endpoints.size() == 3 : StringUtils.join(endpoints, ","); + + // test token removal + tmd.removeEndpoint(InetAddress.getByName("127.0.0.2")); + endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3"); + assert endpoints.size() == 2 : StringUtils.join(endpoints, ","); + + // test token change + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.5")); + endpoints = strategy.getNaturalEndpoints(searchToken, "Keyspace3"); + assert endpoints.size() == 2 : StringUtils.join(endpoints, ","); + assert endpoints.contains(InetAddress.getByName("127.0.0.5")); + assert !endpoints.contains(InetAddress.getByName("127.0.0.3")); + } + + protected static class FakeRackUnawareStrategy extends RackUnawareStrategy + { + private boolean called = false; + + public FakeRackUnawareStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch) + { + super(tokenMetadata, snitch); + } + + @Override + public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table) + { + assert !called : "calculateNaturalEndpoints was already called, result should have been cached"; + called = true; + return super.calculateNaturalEndpoints(token, metadata, table); + } + } + + protected static class FakeRackAwareStrategy extends RackAwareStrategy + { + private boolean called = false; + + public FakeRackAwareStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch) + { + super(tokenMetadata, snitch); + } + + @Override + public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table) + { + assert !called : "calculateNaturalEndpoints was already called, result should have been cached"; + called = true; + return super.calculateNaturalEndpoints(token, metadata, table); + } + } + + protected static class FakeDatacenterShardStrategy extends DatacenterShardStrategy + { + private boolean called = false; + + public FakeDatacenterShardStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch) throws ConfigurationException + { + super(tokenMetadata, snitch); + } + + @Override + public Set<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata, String table) + { + assert !called : "calculateNaturalEndpoints was already called, result should have been cached"; + called = true; + return super.calculateNaturalEndpoints(token, metadata, table); + } + } +}