Author: jbellis Date: Tue Sep 14 02:26:50 2010 New Revision: 996735 URL: http://svn.apache.org/viewvc?rev=996735&view=rev Log: consolidate ApplicationState construction in ApplicationStateFactory patch by jbellis; reviewed by gdusbabek for CASSANDRA-149n9
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/trunk/src/java/org/apache/cassandra/service/LoadDisseminator.java cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=996735&r1=996734&r2=996735&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Tue Sep 14 02:26:50 2010 @@ -165,7 +165,7 @@ public abstract class Migration // immediate notification for esiting nodes. MigrationManager.announce(newVersion, Gossiper.instance.getLiveMembers()); // this is for notifying nodes as they arrive in the cluster. - Gossiper.instance.addLocalApplicationState(MigrationManager.MIGRATION_STATE, new ApplicationState(newVersion.toString())); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATE_MIGRATION, StorageService.stateFactory.migration(newVersion)); } public static UUID getLastMigrationId() Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=996735&r1=996734&r2=996735&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java Tue Sep 14 02:26:50 2010 @@ -21,8 +21,12 @@ package org.apache.cassandra.gms; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.UUID; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.service.StorageService; /** @@ -39,66 +43,105 @@ import org.apache.cassandra.io.ICompactS public class ApplicationState implements Comparable<ApplicationState> { - private static ICompactSerializer<ApplicationState> serializer_; - static - { - serializer_ = new ApplicationStateSerializer(); - } - - int version_; - String state_; + public static final ICompactSerializer<ApplicationState> serializer = new ApplicationStateSerializer(); - - ApplicationState(String state, int version) - { - state_ = state; - version_ = version; - } + // this must be a char that cannot be present in any token + public final static char DELIMITER = ','; + public final static String DELIMITER_STR = new String(new char[] { DELIMITER }); - public static ICompactSerializer<ApplicationState> serializer() - { - return serializer_; - } - - /** - * Wraps the specified state into a ApplicationState instance. - * @param state string representation of arbitrary state. - */ - public ApplicationState(String state) - { - state_ = state; - version_ = VersionGenerator.getNextVersion(); - } - - public String getValue() + public final static String STATE_MOVE = "MOVE"; + public final static String STATE_BOOTSTRAPPING = "BOOT"; + public final static String STATE_NORMAL = "NORMAL"; + public final static String STATE_LEAVING = "LEAVING"; + public final static String STATE_LEFT = "LEFT"; + public final static String STATE_LOAD = "LOAD"; + public static final String STATE_MIGRATION = "MIGRATION"; + + public final static String REMOVE_TOKEN = "remove"; + + public final int version; + public final String state; + + private ApplicationState(String state, int version) { - return state_; + this.state = state; + this.version = version; } - - int getStateVersion() + + private ApplicationState(String state) { - return version_; + this.state = state; + version = VersionGenerator.getNextVersion(); } public int compareTo(ApplicationState apState) { - return this.version_ - apState.getStateVersion(); + return this.version - apState.version; } -} -class ApplicationStateSerializer implements ICompactSerializer<ApplicationState> -{ - public void serialize(ApplicationState appState, DataOutputStream dos) throws IOException + public static class ApplicationStateFactory { - dos.writeUTF(appState.state_); - dos.writeInt(appState.version_); - } + IPartitioner partitioner; - public ApplicationState deserialize(DataInputStream dis) throws IOException - { - String state = dis.readUTF(); - int version = dis.readInt(); - return new ApplicationState(state, version); + public ApplicationStateFactory(IPartitioner partitioner) + { + this.partitioner = partitioner; + } + + public ApplicationState bootstrapping(Token token) + { + return new ApplicationState(ApplicationState.STATE_BOOTSTRAPPING + ApplicationState.DELIMITER + partitioner.getTokenFactory().toString(token)); + } + + public ApplicationState normal(Token token) + { + return new ApplicationState(ApplicationState.STATE_NORMAL + ApplicationState.DELIMITER + partitioner.getTokenFactory().toString(token)); + } + + public ApplicationState load(double load) + { + return new ApplicationState(String.valueOf(load)); + } + + public ApplicationState migration(UUID newVersion) + { + return new ApplicationState(newVersion.toString()); + } + + public ApplicationState leaving(Token token) + { + return new ApplicationState(ApplicationState.STATE_LEAVING + ApplicationState.DELIMITER + partitioner.getTokenFactory().toString(token)); + } + + public ApplicationState left(Token token) + { + return new ApplicationState(ApplicationState.STATE_LEFT + ApplicationState.DELIMITER + partitioner.getTokenFactory().toString(token)); + } + + public ApplicationState removeNonlocal(Token localToken, Token token) + { + return new ApplicationState(ApplicationState.STATE_NORMAL + + ApplicationState.DELIMITER + partitioner.getTokenFactory().toString(localToken) + + ApplicationState.DELIMITER + ApplicationState.REMOVE_TOKEN + + ApplicationState.DELIMITER + partitioner.getTokenFactory().toString(token)); + } + + } + + private static class ApplicationStateSerializer implements ICompactSerializer<ApplicationState> + { + public void serialize(ApplicationState appState, DataOutputStream dos) throws IOException + { + dos.writeUTF(appState.state); + dos.writeInt(appState.version); + } + + public ApplicationState deserialize(DataInputStream dis) throws IOException + { + String state = dis.readUTF(); + int version = dis.readInt(); + return new ApplicationState(state, version); + } } } Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java?rev=996735&r1=996734&r2=996735&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java Tue Sep 14 02:26:50 2010 @@ -158,7 +158,7 @@ class EndpointStateSerializer implements if (appState != null) { dos.writeUTF(key); - ApplicationState.serializer().serialize(appState, dos); + ApplicationState.serializer.serialize(appState, dos); } } } @@ -177,7 +177,7 @@ class EndpointStateSerializer implements } String key = dis.readUTF(); - ApplicationState appState = ApplicationState.serializer().deserialize(dis); + ApplicationState appState = ApplicationState.serializer.deserialize(dis); epState.addApplicationState(key, appState); } return epState; Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=996735&r1=996734&r2=996735&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Tue Sep 14 02:26:50 2010 @@ -77,7 +77,7 @@ public class FailureDetector implements { sb.append(entry.getKey()).append("\n"); for (Map.Entry<String, ApplicationState> state : entry.getValue().applicationState_.entrySet()) - sb.append(" ").append(state.getKey()).append(":").append(state.getValue().getValue()).append("\n"); + sb.append(" ").append(state.getKey()).append(":").append(state.getValue().state).append("\n"); } return sb.toString(); } Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=996735&r1=996734&r2=996735&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Sep 14 02:26:50 2010 @@ -26,8 +26,6 @@ import java.util.concurrent.ConcurrentSk import java.util.concurrent.CopyOnWriteArrayList; import java.net.InetAddress; -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -206,7 +204,7 @@ public class Gossiper implements IFailur for (ApplicationState value : appStateMap.values()) { - int stateVersion = value.getStateVersion(); + int stateVersion = value.version; versions.add( stateVersion ); } @@ -477,7 +475,7 @@ public class Gossiper implements IFailur for (Entry<String, ApplicationState> entry : appStateMap.entrySet()) { ApplicationState appState = entry.getValue(); - if ( appState.getStateVersion() > version ) + if ( appState.version > version ) { if ( reqdEndpointState == null ) { @@ -485,7 +483,7 @@ public class Gossiper implements IFailur } final String key = entry.getKey(); if (logger_.isTraceEnabled()) - logger_.trace("Adding state " + key + ": " + appState.getValue()); + logger_.trace("Adding state " + key + ": " + appState.state); reqdEndpointState.addApplicationState(key, appState); } } @@ -714,8 +712,8 @@ public class Gossiper implements IFailur /* If the generations are the same then apply state if the remote version is greater than local version. */ if ( remoteGeneration == localGeneration ) { - int remoteVersion = remoteAppState.getStateVersion(); - int localVersion = localAppState.getStateVersion(); + int remoteVersion = remoteAppState.version; + int localVersion = localAppState.version; if ( remoteVersion > localVersion ) { Modified: cassandra/trunk/src/java/org/apache/cassandra/service/LoadDisseminator.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/LoadDisseminator.java?rev=996735&r1=996734&r2=996735&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/LoadDisseminator.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/LoadDisseminator.java Tue Sep 14 02:26:50 2010 @@ -28,13 +28,12 @@ import org.slf4j.LoggerFactory; class LoadDisseminator extends TimerTask { private final static Logger logger_ = LoggerFactory.getLogger(LoadDisseminator.class); - protected final static String loadInfo_= "LOAD-INFORMATION"; - + public void run() { - String diskUtilization = String.valueOf(StorageService.instance.getLoad()); if (logger_.isDebugEnabled()) logger_.debug("Disseminating load info ..."); - Gossiper.instance.addLocalApplicationState(LoadDisseminator.loadInfo_, new ApplicationState(diskUtilization)); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATE_LOAD, + StorageService.stateFactory.load(StorageService.instance.getLoad())); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=996735&r1=996734&r2=996735&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Tue Sep 14 02:26:50 2010 @@ -52,7 +52,6 @@ import java.util.concurrent.Future; public class MigrationManager implements IEndpointStateChangeSubscriber { - public static final String MIGRATION_STATE = "MIGRATION"; private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class); /** I'm not going to act here. */ @@ -60,19 +59,19 @@ public class MigrationManager implements public void onChange(InetAddress endpoint, String stateName, ApplicationState state) { - if (!MIGRATION_STATE.equals(stateName)) + if (!ApplicationState.STATE_MIGRATION.equals(stateName)) return; - UUID theirVersion = UUID.fromString(state.getValue()); + UUID theirVersion = UUID.fromString(state.state); rectify(theirVersion, endpoint); } /** gets called after a this node joins a cluster */ public void onAlive(InetAddress endpoint, EndpointState state) { - ApplicationState appState = state.getApplicationState(MIGRATION_STATE); + ApplicationState appState = state.getApplicationState(ApplicationState.STATE_MIGRATION); if (appState != null) { - UUID theirVersion = UUID.fromString(appState.getValue()); + UUID theirVersion = UUID.fromString(appState.state); rectify(theirVersion, endpoint); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=996735&r1=996734&r2=996735&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Tue Sep 14 02:26:50 2010 @@ -27,7 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; @@ -194,9 +193,9 @@ public class StorageLoadBalancer impleme public void onChange(InetAddress endpoint, String stateName, ApplicationState state) { - if (!stateName.equals(LoadDisseminator.loadInfo_)) + if (!stateName.equals(ApplicationState.STATE_LOAD)) return; - loadInfo_.put(endpoint, Double.parseDouble(state.getValue())); + loadInfo_.put(endpoint, Double.parseDouble(state.state)); /* // clone load information to perform calculations @@ -213,10 +212,10 @@ public class StorageLoadBalancer impleme public void onJoin(InetAddress endpoint, EndpointState epState) { - ApplicationState loadState = epState.getApplicationState(LoadDisseminator.loadInfo_); + ApplicationState loadState = epState.getApplicationState(ApplicationState.STATE_LOAD); if (loadState != null) { - onChange(endpoint, LoadDisseminator.loadInfo_, loadState); + onChange(endpoint, ApplicationState.STATE_LOAD, loadState); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=996735&r1=996734&r2=996735&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Sep 14 02:26:50 2010 @@ -50,7 +50,6 @@ import org.apache.cassandra.config.Confi import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.migration.AddKeyspace; import org.apache.cassandra.db.migration.Migration; import org.apache.cassandra.dht.BootStrapper; @@ -89,19 +88,6 @@ public class StorageService implements I public static final int RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized - public final static String MOVE_STATE = "MOVE"; - - // this must be a char that cannot be present in any token - public final static char Delimiter = ','; - private final static String DelimiterStr = new String(new char[] {Delimiter}); - - public final static String STATE_BOOTSTRAPPING = "BOOT"; - public final static String STATE_NORMAL = "NORMAL"; - public final static String STATE_LEAVING = "LEAVING"; - public final static String STATE_LEFT = "LEFT"; - - public final static String REMOVE_TOKEN = "remove"; - /* All verb handler identifiers */ public enum Verb { @@ -125,7 +111,7 @@ public class StorageService implements I DEFINITIONS_UPDATE_RESPONSE, TRUNCATE, SCHEMA_CHECK, - INDEX_SCAN; + INDEX_SCAN, // remember to add new verbs at the end, since we serialize by ordinal } public static final Verb[] VERBS = Verb.values(); @@ -155,6 +141,7 @@ public class StorageService implements I private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner(); + public static final ApplicationState.ApplicationStateFactory stateFactory = new ApplicationState.ApplicationStateFactory(partitioner_); public static final StorageService instance = new StorageService(); @@ -223,7 +210,7 @@ public class StorageService implements I isBootstrapMode = false; SystemTable.setBootstrapped(true); setToken(getLocalToken()); - Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken()))); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATE_MOVE, stateFactory.normal(getLocalToken())); logger_.info("Bootstrap/move completed! Now serving reads."); setMode("Normal", false); } @@ -436,7 +423,7 @@ public class StorageService implements I isBootstrapMode = false; SystemTable.setBootstrapped(true); tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress()); - Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(token))); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATE_MOVE, stateFactory.normal(token)); setMode("Normal", false); } // don't finish startup (enabling thrift) until after bootstrap is done @@ -457,7 +444,7 @@ public class StorageService implements I SystemTable.setBootstrapped(true); Token token = storageMetadata_.getToken(); tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress()); - Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(token))); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATE_MOVE, stateFactory.normal(token)); setMode("Normal", false); } @@ -478,7 +465,7 @@ public class StorageService implements I { isBootstrapMode = true; SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping - Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING + Delimiter + partitioner_.getTokenFactory().toString(token))); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATE_MOVE, stateFactory.normal(token)); setMode("Joining: sleeping " + RING_DELAY + " ms for pending range setup", true); try { @@ -584,12 +571,12 @@ public class StorageService implements I /* * onChange only ever sees one ApplicationState piece change at a time, so we perform a kind of state machine here. - * We are concerned with two events: knowing the token associated with an enpoint, and knowing its operation mode. + * We are concerned with two events: knowing the token associated with an endpoint, and knowing its operation mode. * Nodes can start in either bootstrap or normal mode, and from bootstrap mode can change mode to normal. * A node in bootstrap mode needs to have pendingranges set in TokenMetadata; a node in normal mode * should instead be part of the token ring. * - * Normal state progression of a node should be like this: + * Normal MOVE_STATE progression of a node should be like this: * STATE_BOOTSTRAPPING,token * if bootstrapping. stays this way until all files are received. * STATE_NORMAL,token @@ -608,22 +595,22 @@ public class StorageService implements I */ public void onChange(InetAddress endpoint, String apStateName, ApplicationState apState) { - if (!MOVE_STATE.equals(apStateName)) + if (!ApplicationState.STATE_MOVE.equals(apStateName)) return; - String apStateValue = apState.getValue(); - String[] pieces = apStateValue.split(DelimiterStr, -1); + String apStateValue = apState.state; + String[] pieces = apStateValue.split(ApplicationState.DELIMITER_STR, -1); assert (pieces.length > 0); String moveName = pieces[0]; - if (moveName.equals(STATE_BOOTSTRAPPING)) + if (moveName.equals(ApplicationState.STATE_BOOTSTRAPPING)) handleStateBootstrap(endpoint, pieces); - else if (moveName.equals(STATE_NORMAL)) + else if (moveName.equals(ApplicationState.STATE_NORMAL)) handleStateNormal(endpoint, pieces); - else if (moveName.equals(STATE_LEAVING)) + else if (moveName.equals(ApplicationState.STATE_LEAVING)) handleStateLeaving(endpoint, pieces); - else if (moveName.equals(STATE_LEFT)) + else if (moveName.equals(ApplicationState.STATE_LEFT)) handleStateLeft(endpoint, pieces); } @@ -687,7 +674,7 @@ public class StorageService implements I if (pieces.length > 2) { - if (REMOVE_TOKEN.equals(pieces[2])) + if (ApplicationState.REMOVE_TOKEN.equals(pieces[2])) { // remove token was called on a dead node. Token tokenThatLeft = getPartitioner().getTokenFactory().fromString(pieces[3]); @@ -1445,7 +1432,7 @@ public class StorageService implements I */ private void startLeaving() { - Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_LEAVING + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken()))); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATE_MOVE, stateFactory.leaving(getLocalToken())); tokenMetadata_.addLeavingEndpoint(FBUtilities.getLocalAddress()); calculatePendingRanges(); } @@ -1488,7 +1475,7 @@ public class StorageService implements I tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress()); calculatePendingRanges(); - Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_LEFT + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken()))); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATE_MOVE, stateFactory.left(getLocalToken())); try { Thread.sleep(2 * Gossiper.intervalInMillis_); @@ -1626,7 +1613,7 @@ public class StorageService implements I } // bundle two states together. include this nodes state to keep the status quo, but indicate the leaving token so that it can be dealt with. - Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken()) + Delimiter + REMOVE_TOKEN + Delimiter + partitioner_.getTokenFactory().toString(token))); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATE_MOVE, stateFactory.removeNonlocal(getLocalToken(), token)); } public boolean isClientMode() Modified: cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=996735&r1=996734&r2=996735&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Tue Sep 14 02:26:50 2010 @@ -83,7 +83,7 @@ public class BootStrapperTest extends Cl Range range = ss.getPrimaryRangeForEndpoint(bootstrapSource); Token token = StorageService.getPartitioner().midpoint(range.left, range.right); assert range.contains(token); - ss.onChange(bootstrapAddrs[i], StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + StorageService.getPartitioner().getTokenFactory().toString(token))); + ss.onChange(bootstrapAddrs[i], ApplicationState.STATE_MOVE, StorageService.stateFactory.bootstrapping(token)); } // any further attempt to bootsrtap should fail since every node in the cluster is splitting. @@ -100,7 +100,7 @@ public class BootStrapperTest extends Cl // indicate that one of the nodes is done. see if the node it was bootstrapping from is still available. Range range = ss.getPrimaryRangeForEndpoint(addrs[2]); Token token = StorageService.getPartitioner().midpoint(range.left, range.right); - ss.onChange(bootstrapAddrs[2], StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_NORMAL + StorageService.Delimiter + StorageService.getPartitioner().getTokenFactory().toString(token))); + ss.onChange(bootstrapAddrs[2], ApplicationState.STATE_MOVE, StorageService.stateFactory.normal(token)); load.put(bootstrapAddrs[2], 0d); InetAddress addr = BootStrapper.getBootstrapSource(ss.getTokenMetadata(), load); assert addr != null && addr.equals(addrs[2]); @@ -130,9 +130,9 @@ public class BootStrapperTest extends Cl InetAddress myEndpoint = InetAddress.getByName("127.0.0.1"); Range range5 = ss.getPrimaryRangeForEndpoint(five); - Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range5.left, range5.right); + Token fakeToken = StorageService.getPartitioner().midpoint(range5.left, range5.right); assert range5.contains(fakeToken); - ss.onChange(myEndpoint, StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + ss.getPartitioner().getTokenFactory().toString(fakeToken))); + ss.onChange(myEndpoint, ApplicationState.STATE_MOVE, StorageService.stateFactory.bootstrapping(fakeToken)); tmd = ss.getTokenMetadata(); InetAddress source4 = BootStrapper.getBootstrapSource(tmd, load); Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java?rev=996735&r1=996734&r2=996735&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Tue Sep 14 02:26:50 2010 @@ -53,6 +53,7 @@ public class MoveTest extends CleanupHel TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); + ApplicationState.ApplicationStateFactory stateFactory = new ApplicationState.ApplicationStateFactory(partitioner); IPartitioner oldPartitioner = ss.setPartitionerUnsafe(partitioner); @@ -79,8 +80,8 @@ public class MoveTest extends CleanupHel // Third node leaves ss.onChange(hosts.get(LEAVING_NODE), - StorageService.MOVE_STATE, - new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(LEAVING_NODE)))); + ApplicationState.STATE_MOVE, + stateFactory.leaving(endpointTokens.get(LEAVING_NODE))); assertTrue(tmd.isLeaving(hosts.get(LEAVING_NODE))); AbstractReplicationStrategy strategy; @@ -123,6 +124,7 @@ public class MoveTest extends CleanupHel TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); + ApplicationState.ApplicationStateFactory stateFactory = new ApplicationState.ApplicationStateFactory(partitioner); IPartitioner oldPartitioner = ss.setPartitionerUnsafe(partitioner); @@ -136,13 +138,13 @@ public class MoveTest extends CleanupHel // nodes 6, 8 and 9 leave final int[] LEAVING = new int[] {6, 8, 9}; for (int leaving : LEAVING) - ss.onChange(hosts.get(leaving), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(leaving)))); + ss.onChange(hosts.get(leaving), ApplicationState.STATE_MOVE, stateFactory.leaving(endpointTokens.get(leaving))); // boot two new nodes with keyTokens.get(5) and keyTokens.get(7) InetAddress boot1 = InetAddress.getByName("127.0.1.1"); - ss.onChange(boot1, StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(5)))); + ss.onChange(boot1, ApplicationState.STATE_MOVE, stateFactory.bootstrapping(keyTokens.get(5))); InetAddress boot2 = InetAddress.getByName("127.0.1.2"); - ss.onChange(boot2, StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(7)))); + ss.onChange(boot2, ApplicationState.STATE_MOVE, stateFactory.bootstrapping(keyTokens.get(7))); Collection<InetAddress> endpoints = null; @@ -294,9 +296,9 @@ public class MoveTest extends CleanupHel // Now finish node 6 and node 9 leaving, as well as boot1 (after this node 8 is still // leaving and boot2 in progress - ss.onChange(hosts.get(LEAVING[0]), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(LEAVING[0])))); - ss.onChange(hosts.get(LEAVING[2]), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(LEAVING[2])))); - ss.onChange(boot1, StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_NORMAL + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(5)))); + ss.onChange(hosts.get(LEAVING[0]), ApplicationState.STATE_MOVE, stateFactory.left(endpointTokens.get(LEAVING[0]))); + ss.onChange(hosts.get(LEAVING[2]), ApplicationState.STATE_MOVE, stateFactory.left(endpointTokens.get(LEAVING[2]))); + ss.onChange(boot1, ApplicationState.STATE_MOVE, stateFactory.normal(keyTokens.get(5))); // adjust precalcuated results. this changes what the epected endpoints are. expectedEndpoints.get("Keyspace1").get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8")); @@ -409,6 +411,7 @@ public class MoveTest extends CleanupHel TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); + ApplicationState.ApplicationStateFactory stateFactory = new ApplicationState.ApplicationStateFactory(partitioner); IPartitioner oldPartitioner = ss.setPartitionerUnsafe(partitioner); @@ -420,7 +423,7 @@ public class MoveTest extends CleanupHel createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 7); // node 2 leaves - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(2)))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.leaving(endpointTokens.get(2))); // don't bother to test pending ranges here, that is extensively tested by other // tests. Just check that the node is in appropriate lists. @@ -429,14 +432,14 @@ public class MoveTest extends CleanupHel assertTrue(tmd.getBootstrapTokens().isEmpty()); // Bootstrap the node immedidiately to keyTokens.get(4) without going through STATE_LEFT - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(4)))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.bootstrapping(keyTokens.get(4))); assertFalse(tmd.isMember(hosts.get(2))); assertFalse(tmd.isLeaving(hosts.get(2))); assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(4)).equals(hosts.get(2))); // Bootstrap node hosts.get(3) to keyTokens.get(1) - ss.onChange(hosts.get(3), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(1)))); + ss.onChange(hosts.get(3), ApplicationState.STATE_MOVE, stateFactory.bootstrapping(keyTokens.get(1))); assertFalse(tmd.isMember(hosts.get(3))); assertFalse(tmd.isLeaving(hosts.get(3))); @@ -444,7 +447,7 @@ public class MoveTest extends CleanupHel assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3))); // Bootstrap node hosts.get(2) further to keyTokens.get(3) - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(3)))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.bootstrapping(keyTokens.get(3))); assertFalse(tmd.isMember(hosts.get(2))); assertFalse(tmd.isLeaving(hosts.get(2))); @@ -453,8 +456,8 @@ public class MoveTest extends CleanupHel assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3))); // Go to normal again for both nodes - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_NORMAL + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(3)))); - ss.onChange(hosts.get(3), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_NORMAL + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(2)))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.normal(keyTokens.get(3))); + ss.onChange(hosts.get(3), ApplicationState.STATE_MOVE, stateFactory.normal(keyTokens.get(2))); assertTrue(tmd.isMember(hosts.get(2))); assertFalse(tmd.isLeaving(hosts.get(2))); @@ -475,6 +478,7 @@ public class MoveTest extends CleanupHel TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); + ApplicationState.ApplicationStateFactory stateFactory = new ApplicationState.ApplicationStateFactory(partitioner); IPartitioner oldPartitioner = ss.setPartitionerUnsafe(partitioner); @@ -486,21 +490,21 @@ public class MoveTest extends CleanupHel createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 6); // node 2 leaves - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(2)))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.leaving(endpointTokens.get(2))); assertTrue(tmd.isLeaving(hosts.get(2))); assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2))); // back to normal - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_NORMAL + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(2)))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.normal(keyTokens.get(2))); assertTrue(tmd.getLeavingEndpoints().isEmpty()); assertTrue(tmd.getToken(hosts.get(2)).equals(keyTokens.get(2))); - // node 3 goes through leave and left and then jumps to normal - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(2)))); - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(2)))); - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_NORMAL + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(4)))); + // node 3 goes through leave and left and then jumps to normal at its new token + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.leaving(keyTokens.get(2))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.left(keyTokens.get(2))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.normal(keyTokens.get(4))); assertTrue(tmd.getBootstrapTokens().isEmpty()); assertTrue(tmd.getLeavingEndpoints().isEmpty()); @@ -516,6 +520,7 @@ public class MoveTest extends CleanupHel TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); + ApplicationState.ApplicationStateFactory stateFactory = new ApplicationState.ApplicationStateFactory(partitioner); IPartitioner oldPartitioner = ss.setPartitionerUnsafe(partitioner); @@ -527,28 +532,28 @@ public class MoveTest extends CleanupHel createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 6); // node 2 leaves with _different_ token - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(0)))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.leaving(keyTokens.get(0))); assertTrue(tmd.getToken(hosts.get(2)).equals(keyTokens.get(0))); assertTrue(tmd.isLeaving(hosts.get(2))); assertTrue(tmd.getEndpoint(endpointTokens.get(2)) == null); // go to boostrap - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(1)))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.bootstrapping(keyTokens.get(1))); assertFalse(tmd.isLeaving(hosts.get(2))); assertTrue(tmd.getBootstrapTokens().size() == 1); assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(2))); // jump to leaving again - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(1)))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.leaving(keyTokens.get(1))); assertTrue(tmd.getEndpoint(keyTokens.get(1)).equals(hosts.get(2))); assertTrue(tmd.isLeaving(hosts.get(2))); assertTrue(tmd.getBootstrapTokens().isEmpty()); // go to state left - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(1)))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.left(keyTokens.get(1))); assertFalse(tmd.isMember(hosts.get(2))); assertFalse(tmd.isLeaving(hosts.get(2))); @@ -563,6 +568,7 @@ public class MoveTest extends CleanupHel TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); + ApplicationState.ApplicationStateFactory stateFactory = new ApplicationState.ApplicationStateFactory(partitioner); IPartitioner oldPartitioner = ss.setPartitionerUnsafe(partitioner); @@ -574,19 +580,19 @@ public class MoveTest extends CleanupHel createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 7); // node hosts.get(2) goes jumps to left - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(2)))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.left(endpointTokens.get(2))); assertFalse(tmd.isMember(hosts.get(2))); // node hosts.get(4) goes to bootstrap - ss.onChange(hosts.get(3), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(1)))); + ss.onChange(hosts.get(3), ApplicationState.STATE_MOVE, stateFactory.bootstrapping(keyTokens.get(1))); assertFalse(tmd.isMember(hosts.get(3))); assertTrue(tmd.getBootstrapTokens().size() == 1); assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3))); // and then directly to 'left' - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(1)))); + ss.onChange(hosts.get(2), ApplicationState.STATE_MOVE, stateFactory.left(keyTokens.get(1))); assertTrue(tmd.getBootstrapTokens().size() == 0); assertFalse(tmd.isMember(hosts.get(2))); @@ -611,7 +617,7 @@ public class MoveTest extends CleanupHel for (int i=0; i<endpointTokens.size(); i++) { InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1)); - ss.onChange(ep, StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_NORMAL + StorageService.Delimiter + partitioner.getTokenFactory().toString(endpointTokens.get(i)))); + ss.onChange(ep, ApplicationState.STATE_MOVE, new ApplicationState.ApplicationStateFactory(partitioner).normal(endpointTokens.get(i))); hosts.add(ep); }