Author: jbellis Date: Fri Oct 22 21:18:15 2010 New Revision: 1026497 URL: http://svn.apache.org/viewvc?rev=1026497&view=rev Log: move strategy creation into Table instantiation so it can't be out of sync patch by jbellis; tested by Paul Cannon for CASSANDRA-1649
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/db/Table.java cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1026497&r1=1026496&r2=1026497&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Oct 22 21:18:15 2010 @@ -963,13 +963,11 @@ public class DatabaseDescriptor { tables.put(ksm.name, ksm); DatabaseDescriptor.defsVersion = newVersion; - StorageService.instance.initReplicationStrategy(ksm.name); } public static void clearTableDefinition(KSMetaData ksm, UUID newVersion) { tables.remove(ksm.name); - StorageService.instance.clearReplicationStrategy(ksm.name); DatabaseDescriptor.defsVersion = newVersion; } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1026497&r1=1026496&r2=1026497&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Oct 22 21:18:15 2010 @@ -37,7 +37,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; @@ -47,6 +49,7 @@ import org.apache.cassandra.io.sstable.R import org.apache.cassandra.io.sstable.SSTableDeletingReference; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -92,6 +95,7 @@ public class Table public final Map<Integer, ColumnFamilyStore> columnFamilyStores = new HashMap<Integer, ColumnFamilyStore>(); // TODO make private again private final Object[] indexLocks; private ScheduledFuture<?> flushTask; + public final AbstractReplicationStrategy replicationStrategy; public static Table open(String table) { @@ -231,6 +235,20 @@ public class Table private Table(String table) { name = table; + KSMetaData ksm = DatabaseDescriptor.getKSMetaData(table); + try + { + replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(table, + ksm.strategyClass, + StorageService.instance.getTokenMetadata(), + DatabaseDescriptor.getEndpointSnitch(), + ksm.strategyOptions); + } + catch (ConfigurationException e) + { + throw new RuntimeException(e); + } + indexLocks = new Object[DatabaseDescriptor.getConcurrentWriters() * 8]; for (int i = 0; i < indexLocks.length; i++) indexLocks[i] = new Object(); Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1026497&r1=1026496&r2=1026497&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Fri Oct 22 21:18:15 2010 @@ -34,6 +34,7 @@ package org.apache.cassandra.dht; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.db.Table; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.locator.AbstractReplicationStrategy; @@ -190,7 +191,7 @@ public class BootStrapper Multimap<Range, InetAddress> getRangesWithSources(String table) { assert tokenMetadata.sortedTokens().size() > 0; - final AbstractReplicationStrategy strat = StorageService.instance.getReplicationStrategy(table); + final AbstractReplicationStrategy strat = Table.open(table).replicationStrategy; Collection<Range> myRanges = strat.getPendingAddressRanges(tokenMetadata, token, address); Multimap<Range, InetAddress> myRangeAddresses = ArrayListMultimap.create(); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=1026497&r1=1026496&r2=1026497&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java Fri Oct 22 21:18:15 2010 @@ -24,7 +24,7 @@ package org.apache.cassandra.service; import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.locator.AbstractNetworkTopologySnitch; +import org.apache.cassandra.db.Table; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.net.Message; @@ -65,7 +65,7 @@ public class DatacenterQuorumResponseHan @Override public int determineBlockFor(ConsistencyLevel consistency_level, String table) { - NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) StorageService.instance.getReplicationStrategy(table); + NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) Table.open(table).replicationStrategy; return (stategy.getReplicationFactor(localdc) / 2) + 1; } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=1026497&r1=1026496&r2=1026497&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Fri Oct 22 21:18:15 2010 @@ -31,7 +31,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.locator.AbstractNetworkTopologySnitch; +import org.apache.cassandra.db.Table; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.net.Message; @@ -63,7 +63,7 @@ public class DatacenterSyncWriteResponse super(writeEndpoints, hintedEndpoints, consistencyLevel); assert consistencyLevel == ConsistencyLevel.DCQUORUM; - strategy = (NetworkTopologyStrategy) StorageService.instance.getReplicationStrategy(table); + strategy = (NetworkTopologyStrategy) Table.open(table).replicationStrategy; for (String dc : strategy.getDatacenters()) { Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=1026497&r1=1026496&r2=1026497&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java Fri Oct 22 21:18:15 2010 @@ -28,7 +28,7 @@ import java.net.InetAddress; import java.util.Collection; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.locator.AbstractNetworkTopologySnitch; +import org.apache.cassandra.db.Table; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.net.Message; @@ -65,7 +65,7 @@ public class DatacenterWriteResponseHand @Override protected int determineBlockFor(String table) { - NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) StorageService.instance.getReplicationStrategy(table); + NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) Table.open(table).replicationStrategy; return (strategy.getReplicationFactor(localdc) / 2) + 1; } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1026497&r1=1026496&r2=1026497&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Oct 22 21:18:15 2010 @@ -27,7 +27,6 @@ import java.util.concurrent.*; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.google.common.collect.AbstractIterator; import com.google.common.collect.Multimap; import static com.google.common.base.Charsets.UTF_8; import org.apache.commons.lang.ArrayUtils; @@ -104,7 +103,7 @@ public class StorageProxy implements Sto { mostRecentRowMutation = rm; String table = rm.getTable(); - AbstractReplicationStrategy rs = ss.getReplicationStrategy(table); + AbstractReplicationStrategy rs = Table.open(table).replicationStrategy; List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table, rm.key()); Collection<InetAddress> writeEndpoints = ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()), table, naturalEndpoints); @@ -342,7 +341,7 @@ public class StorageProxy implements Sto if (logger.isDebugEnabled()) logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint); } - AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(command.table); + AbstractReplicationStrategy rs = Table.open(command.table).replicationStrategy; QuorumResponseHandler<Row> quorumResponseHandler = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), consistency_level); MessagingService.instance.sendRR(messages, endpoints, quorumResponseHandler); quorumResponseHandlers.add(quorumResponseHandler); @@ -368,7 +367,7 @@ public class StorageProxy implements Sto } catch (DigestMismatchException ex) { - AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(command.table); + AbstractReplicationStrategy rs = Table.open(command.table).replicationStrategy; QuorumResponseHandler<Row> qrhRepair = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), ConsistencyLevel.QUORUM); if (logger.isDebugEnabled()) logger.debug("Digest mismatch:", ex); @@ -448,7 +447,7 @@ public class StorageProxy implements Sto // collect replies and resolve according to consistency level RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints); - AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(command.keyspace); + AbstractReplicationStrategy rs = Table.open(command.keyspace).replicationStrategy; QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level); // TODO bail early if live endpoints can't satisfy requested consistency level for (InetAddress endpoint : liveEndpoints) @@ -661,7 +660,7 @@ public class StorageProxy implements Sto // collect replies and resolve according to consistency level RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints); - AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(keyspace); + AbstractReplicationStrategy rs = Table.open(keyspace).replicationStrategy; QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level); // TODO bail early if live endpoints can't satisfy requested consistency level IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1026497&r1=1026496&r2=1026497&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Oct 22 21:18:15 2010 @@ -227,9 +227,6 @@ public class StorageService implements I new NamedThreadFactory("ReadRepair"), "request"); - /* We use this interface to determine where replicas need to be placed */ - private final Map<String, AbstractReplicationStrategy> replicationStrategies; - private Set<InetAddress> replicatingNodes; private InetAddress removingNode; @@ -300,54 +297,11 @@ public class StorageService implements I MessagingService.instance.registerVerbHandlers(Verb.TRUNCATE, new TruncateVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.SCHEMA_CHECK, new SchemaCheckVerbHandler()); - replicationStrategies = new HashMap<String, AbstractReplicationStrategy>(); - for (String table : DatabaseDescriptor.getTables()) - initReplicationStrategy(table); - // spin up the streaming serivice so it is available for jmx tools. if (StreamingService.instance == null) throw new RuntimeException("Streaming service is unavailable."); } - public AbstractReplicationStrategy getReplicationStrategy(String table) - { - AbstractReplicationStrategy ars = replicationStrategies.get(table); - assert ars != null: String.format("No replica strategy configured for %s", table); - return ars; - } - - public void initReplicationStrategy(String table) - { - AbstractReplicationStrategy strat = createReplicationStrategy(tokenMetadata_, table); - replicationStrategies.put(table, strat); - } - - public void clearReplicationStrategy(String table) - { - replicationStrategies.remove(table); - } - - public static AbstractReplicationStrategy createReplicationStrategy(TokenMetadata tokenMetadata, String table) - { - AbstractReplicationStrategy replicationStrategy; - KSMetaData ksm = DatabaseDescriptor.getKSMetaData(table); - try - { - replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy( - table, - ksm.strategyClass, - tokenMetadata, - DatabaseDescriptor.getEndpointSnitch(), - ksm.strategyOptions - ); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - return replicationStrategy; - } - public void stopClient() { Gossiper.instance.unregister(migrationManager); @@ -606,7 +560,7 @@ public class StorageService implements I Map<Range, List<InetAddress>> rangeToEndpointMap = new HashMap<Range, List<InetAddress>>(); for (Range range : ranges) { - rangeToEndpointMap.put(range, getReplicationStrategy(keyspace).getNaturalEndpoints(range.right)); + rangeToEndpointMap.put(range, Table.open(keyspace).replicationStrategy.getNaturalEndpoints(range.right)); } return rangeToEndpointMap; } @@ -868,7 +822,7 @@ public class StorageService implements I private void calculatePendingRanges() { for (String table : DatabaseDescriptor.getNonSystemTables()) - calculatePendingRanges(getReplicationStrategy(table), table); + calculatePendingRanges(Table.open(table).replicationStrategy, table); } // public & static for testing purposes @@ -938,7 +892,7 @@ public class StorageService implements I private Multimap<InetAddress, Range> getNewSourceRanges(String table, Set<Range> ranges) { InetAddress myAddress = FBUtilities.getLocalAddress(); - Multimap<Range, InetAddress> rangeAddresses = getReplicationStrategy(table).getRangeAddresses(tokenMetadata_); + Multimap<Range, InetAddress> rangeAddresses = Table.open(table).replicationStrategy.getRangeAddresses(tokenMetadata_); Multimap<InetAddress, Range> sourceRanges = HashMultimap.create(); IFailureDetector failureDetector = FailureDetector.instance; @@ -1061,7 +1015,7 @@ public class StorageService implements I // Find (for each range) all nodes that store replicas for these ranges as well for (Range range : ranges) - currentReplicaEndpoints.put(range, getReplicationStrategy(table).calculateNaturalEndpoints(range.right, tokenMetadata_)); + currentReplicaEndpoints.put(range, Table.open(table).replicationStrategy.calculateNaturalEndpoints(range.right, tokenMetadata_)); TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft(); @@ -1079,7 +1033,7 @@ public class StorageService implements I // range. for (Range range : ranges) { - Collection<InetAddress> newReplicaEndpoints = getReplicationStrategy(table).calculateNaturalEndpoints(range.right, temp); + Collection<InetAddress> newReplicaEndpoints = Table.open(table).replicationStrategy.calculateNaturalEndpoints(range.right, temp); newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range)); if (logger_.isDebugEnabled()) if (newReplicaEndpoints.isEmpty()) @@ -1403,7 +1357,7 @@ public class StorageService implements I */ Collection<Range> getRangesForEndpoint(String table, InetAddress ep) { - return getReplicationStrategy(table).getAddressRanges().get(ep); + return Table.open(table).replicationStrategy.getAddressRanges().get(ep); } /** @@ -1453,7 +1407,7 @@ public class StorageService implements I */ public List<InetAddress> getNaturalEndpoints(String table, Token token) { - return getReplicationStrategy(table).getNaturalEndpoints(token); + return Table.open(table).replicationStrategy.getNaturalEndpoints(token); } /** @@ -1471,7 +1425,7 @@ public class StorageService implements I public List<InetAddress> getLiveNaturalEndpoints(String table, Token token) { List<InetAddress> liveEps = new ArrayList<InetAddress>(); - List<InetAddress> endpoints = getReplicationStrategy(table).getNaturalEndpoints(token); + List<InetAddress> endpoints = Table.open(table).replicationStrategy.getNaturalEndpoints(token); for (InetAddress endpoint : endpoints) { Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java?rev=1026497&r1=1026496&r2=1026497&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java Fri Oct 22 21:18:15 2010 @@ -22,7 +22,8 @@ package org.apache.cassandra.locator; import java.net.InetAddress; import java.util.*; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.db.Table; + import org.apache.commons.lang.StringUtils; import org.junit.Test; @@ -42,7 +43,7 @@ public class ReplicationStrategyEndpoint tmd = new TokenMetadata(); searchToken = new BigIntegerToken(String.valueOf(15)); - strategy = getStrategyWithNewTokenMetadata(StorageService.instance.getReplicationStrategy("Keyspace3"), tmd); + strategy = getStrategyWithNewTokenMetadata(Table.open("Keyspace3").replicationStrategy, tmd); tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddress.getByName("127.0.0.1")); tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddress.getByName("127.0.0.2")); Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java?rev=1026497&r1=1026496&r2=1026497&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java Fri Oct 22 21:18:15 2010 @@ -33,6 +33,7 @@ import org.junit.Test; import static org.junit.Assert.*; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Table; import org.apache.cassandra.dht.*; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.StorageServiceAccessor; @@ -42,13 +43,7 @@ public class SimpleStrategyTest extends @Test public void tryValidTable() { - assert StorageService.instance.getReplicationStrategy("Keyspace1") != null; - } - - @Test(expected = AssertionError.class) - public void tryBogusTable() - { - StorageService.instance.getReplicationStrategy("SomeBogusTableThatDoesntExist"); + assert Table.open("Keyspace1").replicationStrategy != null; } @Test Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=1026497&r1=1026496&r2=1026497&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Fri Oct 22 21:18:15 2010 @@ -184,7 +184,7 @@ public class AntiEntropyServiceTest exte // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned addTokens(2 * DatabaseDescriptor.getReplicationFactor(tablename)); - AbstractReplicationStrategy ars = StorageService.instance.getReplicationStrategy(tablename); + AbstractReplicationStrategy ars = Table.open(tablename).replicationStrategy; Set<InetAddress> expected = new HashSet<InetAddress>(); for (Range replicaRange : ars.getAddressRanges().get(FBUtilities.getLocalAddress())) {