Author: brandonwilliams
Date: Thu Feb  3 20:17:06 2011
New Revision: 1066940

URL: http://svn.apache.org/viewvc?rev=1066940&view=rev
Log:
Fix race condition during decommission by announcing for RING_DELAY and
not removing endpoint state until removing the ep from
justRemovedEndpoints.
Patch by brandonwilliams, reviewed by gdusbabek for CASSANDRA-2072

Modified:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1066940&r1=1066939&r2=1066940&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
 Thu Feb  3 20:17:06 2011
@@ -255,7 +255,7 @@ public class Gossiper implements IFailur
 
         liveEndpoints_.remove(endpoint);
         unreachableEndpoints_.remove(endpoint);
-        endpointStateMap_.remove(endpoint);
+        // do not remove endpointState until the quarantine expires
         FailureDetector.instance.remove(endpoint);
         justRemovedEndpoints_.put(endpoint, System.currentTimeMillis());
     }
@@ -326,8 +326,6 @@ public class Gossiper implements IFailur
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
-        if (logger_.isTraceEnabled())
-            logger_.trace("@@@@ Size of GossipDigestAckMessage is " + 
bos.toByteArray().length);
         return new Message(localEndpoint_, 
StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray());
     }
 
@@ -436,7 +434,8 @@ public class Gossiper implements IFailur
                     else
                     {
                         logger_.info("FatClient " + endpoint + " has been 
silent for " + FatClientTimeout_ + "ms, removing from gossip");
-                        removeEndpoint(endpoint);
+                        if (!justRemovedEndpoints_.containsKey(endpoint)) // 
if the node was decommissioned, it will have been removed but still appear as a 
fat client
+                            removeEndpoint(endpoint); // after quarantine 
justRemoveEndpoints will remove the state
                     }
                 }
 
@@ -457,6 +456,7 @@ public class Gossiper implements IFailur
                     if (logger_.isDebugEnabled())
                         logger_.debug(QUARANTINE_DELAY + " elapsed, " + 
entry.getKey() + " gossip quarantine over");
                     justRemovedEndpoints_.remove(entry.getKey());
+                    endpointStateMap_.remove(entry.getKey());
                 }
             }
         }
@@ -469,8 +469,6 @@ public class Gossiper implements IFailur
 
     EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int 
version)
     {
-        if (logger_.isTraceEnabled())
-            logger_.trace("Scanning for state greater than " + version + " for 
" + forEndpoint);
         EndpointState epState = endpointStateMap_.get(forEndpoint);
         EndpointState reqdEndpointState = null;
 
@@ -488,6 +486,8 @@ public class Gossiper implements IFailur
             if ( localHbVersion > version )
             {
                 reqdEndpointState = new 
EndpointState(epState.getHeartBeatState());
+                if (logger_.isTraceEnabled())
+                    logger_.trace("local heartbeat version " + localHbVersion 
+ " greater than " + version + " for " + forEndpoint);
             }
             /* Accumulate all application states whose versions are greater 
than "version" variable */
             for (Entry<ApplicationState, VersionedValue> entry : 
epState.getApplicationStateMap().entrySet())
@@ -668,6 +668,11 @@ public class Gossiper implements IFailur
                            applyApplicationStateLocally(ep, localEpStatePtr, 
remoteState);
                        }
                }
+                else
+                {
+                    if (logger_.isTraceEnabled())
+                        logger_.trace("Ignoring remote generation " + 
remoteGeneration + " < " + localGeneration);
+                }
             }
             else
             {
@@ -683,6 +688,8 @@ public class Gossiper implements IFailur
 
         if ( remoteHbState.getGeneration() > localHbState.getGeneration() )
         {
+            if (logger_.isTraceEnabled())
+                logger_.trace("Updating heartbeat state generation to " + 
remoteHbState.getGeneration() + " from " + localHbState.getGeneration() + " for 
" + addr);
             localState.setHeartBeatState(remoteHbState);
         }
         if ( localHbState.getGeneration() == remoteHbState.getGeneration() )
@@ -694,6 +701,11 @@ public class Gossiper implements IFailur
                 if (logger_.isTraceEnabled())
                     logger_.trace("Updating heartbeat state version to " + 
localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + 
" for " + addr + " ...");
             }
+            else
+            {
+                if (logger_.isTraceEnabled())
+                    logger_.trace("Ignoring lower version " + 
remoteHbState.getHeartBeatVersion() + " for " + addr + " which is lower than " 
+ localHbState.getHeartBeatVersion());
+            }
         }
     }
 
@@ -777,6 +789,8 @@ public class Gossiper implements IFailur
     {
         /* We are here since we have no data for this endpoint locally so 
request everthing. */
         deltaGossipDigestList.add( new GossipDigest(gDigest.getEndpoint(), 
remoteGeneration, 0) );
+        if (logger_.isTraceEnabled())
+            logger_.trace("requestAll for " + gDigest.getEndpoint());
     }
 
     /* Send all the data with version greater than maxRemoteVersion */

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1066940&r1=1066939&r2=1066940&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
 Thu Feb  3 20:17:06 2011
@@ -1602,9 +1602,10 @@ public class StorageService implements I
         calculatePendingRanges();
 
         Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, 
valueFactory.left(getLocalToken()));
+        logger_.info("Announcing that I have left the ring for " + RING_DELAY 
+ "ms");
         try
         {
-            Thread.sleep(2 * Gossiper.intervalInMillis_);
+            Thread.sleep(RING_DELAY);
         }
         catch (InterruptedException e)
         {


Reply via email to