Author: jbellis Date: Tue Apr 27 18:35:21 2010 New Revision: 938597 URL: http://svn.apache.org/viewvc?rev=938597&view=rev Log: Replace synchronization in Gossiper with concurrent data structures and volatile fields. Also removes getSortedApplicationStates since nothing actually seems to rely on iterating in sorted order. Patch by Brandon Williams and jbellis for CASSANDRA-757
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java?rev=938597&r1=938596&r2=938597&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java Tue Apr 27 18:35:21 2010 @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.*; import org.apache.cassandra.io.ICompactSerializer; +import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,25 +35,21 @@ import org.slf4j.LoggerFactory; public class EndpointState { - private static ICompactSerializer<EndpointState> serializer_; - static - { - serializer_ = new EndpointStateSerializer(); - } - - HeartBeatState hbState_; - Map<String, ApplicationState> applicationState_ = new Hashtable<String, ApplicationState>(); + private final static ICompactSerializer<EndpointState> serializer_ = new EndpointStateSerializer(); + + volatile HeartBeatState hbState_; + final Map<String, ApplicationState> applicationState_ = new NonBlockingHashMap<String, ApplicationState>(); /* fields below do not get serialized */ - long updateTimestamp_; - boolean isAlive_; - boolean isAGossiper_; + volatile long updateTimestamp_; + volatile boolean isAlive_; + volatile boolean isAGossiper_; // whether this endpoint has token associated with it or not. Initially set false for all // endpoints. After certain time of inactivity, gossiper will examine if this node has a // token or not and will set this true if token is found. If there is no token, this is a // fat client and will be removed automatically from gossip. - boolean hasToken_; + volatile boolean hasToken_; public static ICompactSerializer<EndpointState> serializer() { @@ -73,17 +70,21 @@ public class EndpointState return hbState_; } - synchronized void setHeartBeatState(HeartBeatState hbState) + void setHeartBeatState(HeartBeatState hbState) { updateTimestamp(); hbState_ = hbState; } - + public ApplicationState getApplicationState(String key) { return applicationState_.get(key); } - + + /** + * TODO replace this with operations that don't expose private state + */ + @Deprecated public Map<String, ApplicationState> getApplicationStateMap() { return applicationState_; @@ -100,7 +101,7 @@ public class EndpointState return updateTimestamp_; } - synchronized void updateTimestamp() + void updateTimestamp() { updateTimestamp_ = System.currentTimeMillis(); } @@ -110,7 +111,7 @@ public class EndpointState return isAlive_; } - synchronized void isAlive(boolean value) + void isAlive(boolean value) { isAlive_ = value; } @@ -121,13 +122,13 @@ public class EndpointState return isAGossiper_; } - synchronized void isAGossiper(boolean value) + void isAGossiper(boolean value) { //isAlive_ = false; isAGossiper_ = value; } - public synchronized void setHasToken(boolean value) + public void setHasToken(boolean value) { hasToken_ = value; } @@ -136,22 +137,6 @@ public class EndpointState { return hasToken_; } - - public List<Map.Entry<String,ApplicationState>> getSortedApplicationStates() - { - ArrayList<Map.Entry<String, ApplicationState>> entries = new ArrayList<Map.Entry<String, ApplicationState>>(); - entries.addAll(applicationState_.entrySet()); - Collections.sort(entries, new Comparator<Map.Entry<String, ApplicationState>>() - { - public int compare(Map.Entry<String, ApplicationState> lhs, Map.Entry<String, ApplicationState> rhs) - { - return lhs.getValue().compareTo(rhs.getValue()); - } - }); - - return entries; - } - } class EndpointStateSerializer implements ICompactSerializer<EndpointState> Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=938597&r1=938596&r2=938597&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Apr 27 18:35:21 2010 @@ -21,6 +21,9 @@ package org.apache.cassandra.gms; import java.io.*; import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArrayList; import java.net.InetAddress; import org.apache.cassandra.concurrent.StageManager; @@ -52,45 +55,42 @@ public class Gossiper implements IFailur { try { - synchronized( Gossiper.instance ) - { - /* Update the local heartbeat counter. */ - endpointStateMap_.get(localEndpoint_).getHeartBeatState().updateHeartBeat(); - List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); - Gossiper.instance.makeRandomGossipDigest(gDigests); + /* Update the local heartbeat counter. */ + endpointStateMap_.get(localEndpoint_).getHeartBeatState().updateHeartBeat(); + List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); + Gossiper.instance.makeRandomGossipDigest(gDigests); + + if ( gDigests.size() > 0 ) + { + Message message = makeGossipDigestSynMessage(gDigests); + /* Gossip to some random live member */ + boolean gossipedToSeed = doGossipToLiveMember(message); + + /* Gossip to some unreachable member with some probability to check if he is back up */ + doGossipToUnreachableMember(message); + + /* Gossip to a seed if we did not do so above, or we have seen less nodes + than there are seeds. This prevents partitions where each group of nodes + is only gossiping to a subset of the seeds. + + The most straightforward check would be to check that all the seeds have been + verified either as live or unreachable. To avoid that computation each round, + we reason that: + + either all the live nodes are seeds, in which case non-seeds that come online + will introduce themselves to a member of the ring by definition, + + or there is at least one non-seed node in the list, in which case eventually + someone will gossip to it, and then do a gossip to a random seed from the + gossipedToSeed check. + + See CASSANDRA-150 for more exposition. */ + if (!gossipedToSeed || liveEndpoints_.size() < seeds_.size()) + doGossipToSeed(message); - if ( gDigests.size() > 0 ) - { - Message message = makeGossipDigestSynMessage(gDigests); - /* Gossip to some random live member */ - boolean gossipedToSeed = doGossipToLiveMember(message); - - /* Gossip to some unreachable member with some probability to check if he is back up */ - doGossipToUnreachableMember(message); - - /* Gossip to a seed if we did not do so above, or we have seen less nodes - than there are seeds. This prevents partitions where each group of nodes - is only gossiping to a subset of the seeds. - - The most straightforward check would be to check that all the seeds have been - verified either as live or unreachable. To avoid that computation each round, - we reason that: - - either all the live nodes are seeds, in which case non-seeds that come online - will introduce themselves to a member of the ring by definition, - - or there is at least one non-seed node in the list, in which case eventually - someone will gossip to it, and then do a gossip to a random seed from the - gossipedToSeed check. - - See CASSANDRA-150 for more exposition. */ - if (!gossipedToSeed || liveEndpoints_.size() < seeds_.size()) - doGossipToSeed(message); - - if (logger_.isTraceEnabled()) - logger_.trace("Performing status check ..."); - doStatusCheck(); - } + if (logger_.isTraceEnabled()) + logger_.trace("Performing status check ..."); + doStatusCheck(); } } catch (Exception e) @@ -110,27 +110,34 @@ public class Gossiper implements IFailur private long aVeryLongTime_; private long FatClientTimeout_; private Random random_ = new Random(); + private Comparator<InetAddress> inetcomparator = new Comparator<InetAddress>() + { + public int compare(InetAddress addr1, InetAddress addr2) + { + return addr1.getHostAddress().compareTo(addr2.getHostAddress()); + } + }; /* subscribers for interest in EndpointState change */ - private List<IEndpointStateChangeSubscriber> subscribers_ = new ArrayList<IEndpointStateChangeSubscriber>(); + private List<IEndpointStateChangeSubscriber> subscribers_ = new CopyOnWriteArrayList<IEndpointStateChangeSubscriber>(); /* live member set */ - private Set<InetAddress> liveEndpoints_ = new HashSet<InetAddress>(); + private Set<InetAddress> liveEndpoints_ = new ConcurrentSkipListSet<InetAddress>(inetcomparator); /* unreachable member set */ - private Set<InetAddress> unreachableEndpoints_ = new HashSet<InetAddress>(); + private Set<InetAddress> unreachableEndpoints_ = new ConcurrentSkipListSet<InetAddress>(inetcomparator); /* initial seeds for joining the cluster */ - private Set<InetAddress> seeds_ = new HashSet<InetAddress>(); + private Set<InetAddress> seeds_ = new ConcurrentSkipListSet<InetAddress>(inetcomparator); /* map where key is the endpoint and value is the state associated with the endpoint */ - Map<InetAddress, EndpointState> endpointStateMap_ = new Hashtable<InetAddress, EndpointState>(); + Map<InetAddress, EndpointState> endpointStateMap_ = new ConcurrentHashMap<InetAddress, EndpointState>(); /* map where key is endpoint and value is timestamp when this endpoint was removed from * gossip. We will ignore any gossip regarding these endpoints for Streaming.RING_DELAY time * after removal to prevent nodes from falsely reincarnating during the time when removal * gossip gets propagated to all nodes */ - Map<InetAddress, Long> justRemovedEndpoints_ = new Hashtable<InetAddress, Long>(); + Map<InetAddress, Long> justRemovedEndpoints_ = new ConcurrentHashMap<InetAddress, Long>(); private Gossiper() { @@ -144,12 +151,12 @@ public class Gossiper implements IFailur } /** Register with the Gossiper for EndpointState notifications */ - public synchronized void register(IEndpointStateChangeSubscriber subscriber) + public void register(IEndpointStateChangeSubscriber subscriber) { subscribers_.add(subscriber); } - public synchronized void unregister(IEndpointStateChangeSubscriber subscriber) + public void unregister(IEndpointStateChangeSubscriber subscriber) { subscribers_.remove(subscriber); } @@ -224,8 +231,7 @@ public class Gossiper implements IFailur } /** - * No locking required since it is called from a method that already - * has acquired a lock. The gossip digest is built based on randomization + * The gossip digest is built based on randomization * rather than just looping through the collection of live endpoints. * * @param gDigests list of Gossip Digests. @@ -431,7 +437,7 @@ public class Gossiper implements IFailur return endpointStateMap_.get(ep); } - synchronized EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version) + EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version) { if (logger_.isTraceEnabled()) logger_.trace("Scanning for state greater than " + version + " for " + forEndpoint); @@ -588,7 +594,7 @@ public class Gossiper implements IFailur subscriber.onJoin(ep, epState); } - synchronized void applyStateLocally(Map<InetAddress, EndpointState> epStateMap) + void applyStateLocally(Map<InetAddress, EndpointState> epStateMap) { for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet()) { @@ -657,7 +663,7 @@ public class Gossiper implements IFailur { Map<String, ApplicationState> localAppStateMap = localStatePtr.getApplicationStateMap(); - for (Map.Entry<String,ApplicationState> remoteEntry : remoteStatePtr.getSortedApplicationStates()) + for (Map.Entry<String,ApplicationState> remoteEntry : remoteStatePtr.getApplicationStateMap().entrySet()) { String remoteKey = remoteEntry.getKey(); ApplicationState remoteAppState = remoteEntry.getValue(); @@ -706,7 +712,7 @@ public class Gossiper implements IFailur } } - synchronized void isAlive(InetAddress addr, EndpointState epState, boolean value) + void isAlive(InetAddress addr, EndpointState epState, boolean value) { epState.isAlive(value); if (value) @@ -747,7 +753,7 @@ public class Gossiper implements IFailur This method is used to figure the state that the Gossiper has but Gossipee doesn't. The delta digests and the delta state are built up. */ - synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap) + void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap) { for ( GossipDigest gDigest : gDigestList ) { @@ -837,7 +843,7 @@ public class Gossiper implements IFailur gossipTimer_.schedule( new GossipTimerTask(), Gossiper.intervalInMillis_, Gossiper.intervalInMillis_); } - public synchronized void addLocalApplicationState(String key, ApplicationState appState) + public void addLocalApplicationState(String key, ApplicationState appState) { assert !StorageService.instance.isClientMode(); EndpointState epState = endpointStateMap_.get(localEndpoint_); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=938597&r1=938596&r2=938597&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Apr 27 18:35:21 2010 @@ -890,7 +890,7 @@ public class StorageService implements I public void onJoin(InetAddress endpoint, EndpointState epState) { - for (Map.Entry<String,ApplicationState> entry : epState.getSortedApplicationStates()) + for (Map.Entry<String,ApplicationState> entry : epState.getApplicationStateMap().entrySet()) { onChange(endpoint, entry.getKey(), entry.getValue()); }