Author: jbellis
Date: Tue Apr 27 18:35:21 2010
New Revision: 938597

URL: http://svn.apache.org/viewvc?rev=938597&view=rev
Log:
Replace synchronization in Gossiper with concurrent data structures and 
volatile fields.  Also removes getSortedApplicationStates since nothing 
actually seems to rely on iterating in sorted order.  Patch by Brandon Williams 
and jbellis for CASSANDRA-757

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java?rev=938597&r1=938596&r2=938597&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java Tue 
Apr 27 18:35:21 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.*;
 import org.apache.cassandra.io.ICompactSerializer;
 
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,25 +35,21 @@ import org.slf4j.LoggerFactory;
 
 public class EndpointState
 {
-    private static ICompactSerializer<EndpointState> serializer_;
-    static
-    {
-        serializer_ = new EndpointStateSerializer();
-    }
-    
-    HeartBeatState hbState_;
-    Map<String, ApplicationState> applicationState_ = new Hashtable<String, 
ApplicationState>();
+    private final static ICompactSerializer<EndpointState> serializer_ = new 
EndpointStateSerializer();
+
+    volatile HeartBeatState hbState_;
+    final Map<String, ApplicationState> applicationState_ = new 
NonBlockingHashMap<String, ApplicationState>();
     
     /* fields below do not get serialized */
-    long updateTimestamp_;
-    boolean isAlive_;
-    boolean isAGossiper_;
+    volatile long updateTimestamp_;
+    volatile boolean isAlive_;
+    volatile boolean isAGossiper_;
 
     // whether this endpoint has token associated with it or not. Initially 
set false for all
     // endpoints. After certain time of inactivity, gossiper will examine if 
this node has a
     // token or not and will set this true if token is found. If there is no 
token, this is a
     // fat client and will be removed automatically from gossip.
-    boolean hasToken_;
+    volatile boolean hasToken_;
 
     public static ICompactSerializer<EndpointState> serializer()
     {
@@ -73,17 +70,21 @@ public class EndpointState
         return hbState_;
     }
     
-    synchronized void setHeartBeatState(HeartBeatState hbState)
+    void setHeartBeatState(HeartBeatState hbState)
     {
         updateTimestamp();
         hbState_ = hbState;
     }
-    
+
     public ApplicationState getApplicationState(String key)
     {
         return applicationState_.get(key);
     }
-    
+
+    /**
+     * TODO replace this with operations that don't expose private state
+     */
+    @Deprecated
     public Map<String, ApplicationState> getApplicationStateMap()
     {
         return applicationState_;
@@ -100,7 +101,7 @@ public class EndpointState
         return updateTimestamp_;
     }
     
-    synchronized void updateTimestamp()
+    void updateTimestamp()
     {
         updateTimestamp_ = System.currentTimeMillis();
     }
@@ -110,7 +111,7 @@ public class EndpointState
         return isAlive_;
     }
 
-    synchronized void isAlive(boolean value)
+    void isAlive(boolean value)
     {        
         isAlive_ = value;        
     }
@@ -121,13 +122,13 @@ public class EndpointState
         return isAGossiper_;
     }
 
-    synchronized void isAGossiper(boolean value)
+    void isAGossiper(boolean value)
     {                
         //isAlive_ = false;
         isAGossiper_ = value;        
     }
 
-    public synchronized void setHasToken(boolean value)
+    public void setHasToken(boolean value)
     {
         hasToken_ = value;
     }
@@ -136,22 +137,6 @@ public class EndpointState
     {
         return hasToken_;
     }
-
-    public List<Map.Entry<String,ApplicationState>> 
getSortedApplicationStates()
-    {
-        ArrayList<Map.Entry<String, ApplicationState>> entries = new 
ArrayList<Map.Entry<String, ApplicationState>>();
-        entries.addAll(applicationState_.entrySet());
-        Collections.sort(entries, new Comparator<Map.Entry<String, 
ApplicationState>>()
-        {
-            public int compare(Map.Entry<String, ApplicationState> lhs, 
Map.Entry<String, ApplicationState> rhs)
-            {
-                return lhs.getValue().compareTo(rhs.getValue());
-            }
-        });
-
-        return entries;
-    }
-
 }
 
 class EndpointStateSerializer implements ICompactSerializer<EndpointState>

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=938597&r1=938596&r2=938597&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Apr 27 
18:35:21 2010
@@ -21,6 +21,9 @@ package org.apache.cassandra.gms;
 import java.io.*;
 import java.util.*;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.net.InetAddress;
 
 import org.apache.cassandra.concurrent.StageManager;
@@ -52,45 +55,42 @@ public class Gossiper implements IFailur
         {
             try
             {
-                synchronized( Gossiper.instance )
-                {
-                       /* Update the local heartbeat counter. */
-                    
endpointStateMap_.get(localEndpoint_).getHeartBeatState().updateHeartBeat();
-                    List<GossipDigest> gDigests = new 
ArrayList<GossipDigest>();
-                    Gossiper.instance.makeRandomGossipDigest(gDigests);
+                /* Update the local heartbeat counter. */
+                
endpointStateMap_.get(localEndpoint_).getHeartBeatState().updateHeartBeat();
+                List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+                Gossiper.instance.makeRandomGossipDigest(gDigests);
+
+                if ( gDigests.size() > 0 )
+                {
+                    Message message = makeGossipDigestSynMessage(gDigests);
+                    /* Gossip to some random live member */
+                    boolean gossipedToSeed = doGossipToLiveMember(message);
+
+                    /* Gossip to some unreachable member with some probability 
to check if he is back up */
+                    doGossipToUnreachableMember(message);
+
+                    /* Gossip to a seed if we did not do so above, or we have 
seen less nodes
+                       than there are seeds.  This prevents partitions where 
each group of nodes
+                       is only gossiping to a subset of the seeds.
+
+                       The most straightforward check would be to check that 
all the seeds have been
+                       verified either as live or unreachable.  To avoid that 
computation each round,
+                       we reason that:
+
+                       either all the live nodes are seeds, in which case 
non-seeds that come online
+                       will introduce themselves to a member of the ring by 
definition,
+
+                       or there is at least one non-seed node in the list, in 
which case eventually
+                       someone will gossip to it, and then do a gossip to a 
random seed from the
+                       gossipedToSeed check.
+
+                       See CASSANDRA-150 for more exposition. */
+                    if (!gossipedToSeed || liveEndpoints_.size() < 
seeds_.size())
+                        doGossipToSeed(message);
 
-                    if ( gDigests.size() > 0 )
-                    {
-                        Message message = makeGossipDigestSynMessage(gDigests);
-                        /* Gossip to some random live member */
-                        boolean gossipedToSeed = doGossipToLiveMember(message);
-
-                        /* Gossip to some unreachable member with some 
probability to check if he is back up */
-                        doGossipToUnreachableMember(message);
-
-                        /* Gossip to a seed if we did not do so above, or we 
have seen less nodes
-                           than there are seeds.  This prevents partitions 
where each group of nodes
-                           is only gossiping to a subset of the seeds.
-
-                           The most straightforward check would be to check 
that all the seeds have been
-                           verified either as live or unreachable.  To avoid 
that computation each round,
-                           we reason that:
-
-                           either all the live nodes are seeds, in which case 
non-seeds that come online
-                           will introduce themselves to a member of the ring 
by definition,
-
-                           or there is at least one non-seed node in the list, 
in which case eventually
-                           someone will gossip to it, and then do a gossip to 
a random seed from the
-                           gossipedToSeed check.
-
-                           See CASSANDRA-150 for more exposition. */
-                        if (!gossipedToSeed || liveEndpoints_.size() < 
seeds_.size())
-                            doGossipToSeed(message);
-
-                        if (logger_.isTraceEnabled())
-                            logger_.trace("Performing status check ...");
-                        doStatusCheck();
-                    }
+                    if (logger_.isTraceEnabled())
+                        logger_.trace("Performing status check ...");
+                    doStatusCheck();
                 }
             }
             catch (Exception e)
@@ -110,27 +110,34 @@ public class Gossiper implements IFailur
     private long aVeryLongTime_;
     private long FatClientTimeout_;
     private Random random_ = new Random();
+    private Comparator<InetAddress> inetcomparator = new 
Comparator<InetAddress>()
+    {
+        public int compare(InetAddress addr1,  InetAddress addr2)
+        {
+            return addr1.getHostAddress().compareTo(addr2.getHostAddress());
+        }
+    };
 
     /* subscribers for interest in EndpointState change */
-    private List<IEndpointStateChangeSubscriber> subscribers_ = new 
ArrayList<IEndpointStateChangeSubscriber>();
+    private List<IEndpointStateChangeSubscriber> subscribers_ = new 
CopyOnWriteArrayList<IEndpointStateChangeSubscriber>();
 
     /* live member set */
-    private Set<InetAddress> liveEndpoints_ = new HashSet<InetAddress>();
+    private Set<InetAddress> liveEndpoints_ = new 
ConcurrentSkipListSet<InetAddress>(inetcomparator);
 
     /* unreachable member set */
-    private Set<InetAddress> unreachableEndpoints_ = new 
HashSet<InetAddress>();
+    private Set<InetAddress> unreachableEndpoints_ = new 
ConcurrentSkipListSet<InetAddress>(inetcomparator);
 
     /* initial seeds for joining the cluster */
-    private Set<InetAddress> seeds_ = new HashSet<InetAddress>();
+    private Set<InetAddress> seeds_ = new 
ConcurrentSkipListSet<InetAddress>(inetcomparator);
 
     /* map where key is the endpoint and value is the state associated with 
the endpoint */
-    Map<InetAddress, EndpointState> endpointStateMap_ = new 
Hashtable<InetAddress, EndpointState>();
+    Map<InetAddress, EndpointState> endpointStateMap_ = new 
ConcurrentHashMap<InetAddress, EndpointState>();
 
     /* map where key is endpoint and value is timestamp when this endpoint was 
removed from
      * gossip. We will ignore any gossip regarding these endpoints for 
Streaming.RING_DELAY time
      * after removal to prevent nodes from falsely reincarnating during the 
time when removal
      * gossip gets propagated to all nodes */
-    Map<InetAddress, Long> justRemovedEndpoints_ = new Hashtable<InetAddress, 
Long>();
+    Map<InetAddress, Long> justRemovedEndpoints_ = new 
ConcurrentHashMap<InetAddress, Long>();
 
     private Gossiper()
     {
@@ -144,12 +151,12 @@ public class Gossiper implements IFailur
     }
 
     /** Register with the Gossiper for EndpointState notifications */
-    public synchronized void register(IEndpointStateChangeSubscriber 
subscriber)
+    public void register(IEndpointStateChangeSubscriber subscriber)
     {
         subscribers_.add(subscriber);
     }
 
-    public synchronized void unregister(IEndpointStateChangeSubscriber 
subscriber)
+    public void unregister(IEndpointStateChangeSubscriber subscriber)
     {
         subscribers_.remove(subscriber);
     }
@@ -224,8 +231,7 @@ public class Gossiper implements IFailur
     }
 
     /**
-     * No locking required since it is called from a method that already
-     * has acquired a lock. The gossip digest is built based on randomization
+     * The gossip digest is built based on randomization
      * rather than just looping through the collection of live endpoints.
      *
      * @param gDigests list of Gossip Digests.
@@ -431,7 +437,7 @@ public class Gossiper implements IFailur
         return endpointStateMap_.get(ep);
     }
 
-    synchronized EndpointState getStateForVersionBiggerThan(InetAddress 
forEndpoint, int version)
+    EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int 
version)
     {
         if (logger_.isTraceEnabled())
             logger_.trace("Scanning for state greater than " + version + " for 
" + forEndpoint);
@@ -588,7 +594,7 @@ public class Gossiper implements IFailur
             subscriber.onJoin(ep, epState);
     }
 
-    synchronized void applyStateLocally(Map<InetAddress, EndpointState> 
epStateMap)
+    void applyStateLocally(Map<InetAddress, EndpointState> epStateMap)
     {
         for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
         {
@@ -657,7 +663,7 @@ public class Gossiper implements IFailur
     {
         Map<String, ApplicationState> localAppStateMap = 
localStatePtr.getApplicationStateMap();
 
-        for (Map.Entry<String,ApplicationState> remoteEntry : 
remoteStatePtr.getSortedApplicationStates())
+        for (Map.Entry<String,ApplicationState> remoteEntry : 
remoteStatePtr.getApplicationStateMap().entrySet())
         {
             String remoteKey = remoteEntry.getKey();
             ApplicationState remoteAppState = remoteEntry.getValue();
@@ -706,7 +712,7 @@ public class Gossiper implements IFailur
         }
     }
 
-    synchronized void isAlive(InetAddress addr, EndpointState epState, boolean 
value)
+    void isAlive(InetAddress addr, EndpointState epState, boolean value)
     {
         epState.isAlive(value);
         if (value)
@@ -747,7 +753,7 @@ public class Gossiper implements IFailur
         This method is used to figure the state that the Gossiper has but 
Gossipee doesn't. The delta digests
         and the delta state are built up.
     */
-    synchronized void examineGossiper(List<GossipDigest> gDigestList, 
List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState> 
deltaEpStateMap)
+    void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> 
deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap)
     {
         for ( GossipDigest gDigest : gDigestList )
         {
@@ -837,7 +843,7 @@ public class Gossiper implements IFailur
         gossipTimer_.schedule( new GossipTimerTask(), 
Gossiper.intervalInMillis_, Gossiper.intervalInMillis_);
     }
 
-    public synchronized void addLocalApplicationState(String key, 
ApplicationState appState)
+    public void addLocalApplicationState(String key, ApplicationState appState)
     {
         assert !StorageService.instance.isClientMode();
         EndpointState epState = endpointStateMap_.get(localEndpoint_);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=938597&r1=938596&r2=938597&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Tue Apr 27 18:35:21 2010
@@ -890,7 +890,7 @@ public class StorageService implements I
 
     public void onJoin(InetAddress endpoint, EndpointState epState)
     {
-        for (Map.Entry<String,ApplicationState> entry : 
epState.getSortedApplicationStates())
+        for (Map.Entry<String,ApplicationState> entry : 
epState.getApplicationStateMap().entrySet())
         {
             onChange(endpoint, entry.getKey(), entry.getValue());
         }


Reply via email to