Author: brandonwilliams Date: Thu Feb 3 20:17:06 2011 New Revision: 1066940
URL: http://svn.apache.org/viewvc?rev=1066940&view=rev Log: Fix race condition during decommission by announcing for RING_DELAY and not removing endpoint state until removing the ep from justRemovedEndpoints. Patch by brandonwilliams, reviewed by gdusbabek for CASSANDRA-2072 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1066940&r1=1066939&r2=1066940&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java Thu Feb 3 20:17:06 2011 @@ -255,7 +255,7 @@ public class Gossiper implements IFailur liveEndpoints_.remove(endpoint); unreachableEndpoints_.remove(endpoint); - endpointStateMap_.remove(endpoint); + // do not remove endpointState until the quarantine expires FailureDetector.instance.remove(endpoint); justRemovedEndpoints_.put(endpoint, System.currentTimeMillis()); } @@ -326,8 +326,6 @@ public class Gossiper implements IFailur ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos); - if (logger_.isTraceEnabled()) - logger_.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length); return new Message(localEndpoint_, StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray()); } @@ -436,7 +434,8 @@ public class Gossiper implements IFailur else { logger_.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout_ + "ms, removing from gossip"); - removeEndpoint(endpoint); + if (!justRemovedEndpoints_.containsKey(endpoint)) // if the node was decommissioned, it will have been removed but still appear as a fat client + removeEndpoint(endpoint); // after quarantine justRemoveEndpoints will remove the state } } @@ -457,6 +456,7 @@ public class Gossiper implements IFailur if (logger_.isDebugEnabled()) logger_.debug(QUARANTINE_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over"); justRemovedEndpoints_.remove(entry.getKey()); + endpointStateMap_.remove(entry.getKey()); } } } @@ -469,8 +469,6 @@ public class Gossiper implements IFailur EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version) { - if (logger_.isTraceEnabled()) - logger_.trace("Scanning for state greater than " + version + " for " + forEndpoint); EndpointState epState = endpointStateMap_.get(forEndpoint); EndpointState reqdEndpointState = null; @@ -488,6 +486,8 @@ public class Gossiper implements IFailur if ( localHbVersion > version ) { reqdEndpointState = new EndpointState(epState.getHeartBeatState()); + if (logger_.isTraceEnabled()) + logger_.trace("local heartbeat version " + localHbVersion + " greater than " + version + " for " + forEndpoint); } /* Accumulate all application states whose versions are greater than "version" variable */ for (Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet()) @@ -668,6 +668,11 @@ public class Gossiper implements IFailur applyApplicationStateLocally(ep, localEpStatePtr, remoteState); } } + else + { + if (logger_.isTraceEnabled()) + logger_.trace("Ignoring remote generation " + remoteGeneration + " < " + localGeneration); + } } else { @@ -683,6 +688,8 @@ public class Gossiper implements IFailur if ( remoteHbState.getGeneration() > localHbState.getGeneration() ) { + if (logger_.isTraceEnabled()) + logger_.trace("Updating heartbeat state generation to " + remoteHbState.getGeneration() + " from " + localHbState.getGeneration() + " for " + addr); localState.setHeartBeatState(remoteHbState); } if ( localHbState.getGeneration() == remoteHbState.getGeneration() ) @@ -694,6 +701,11 @@ public class Gossiper implements IFailur if (logger_.isTraceEnabled()) logger_.trace("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ..."); } + else + { + if (logger_.isTraceEnabled()) + logger_.trace("Ignoring lower version " + remoteHbState.getHeartBeatVersion() + " for " + addr + " which is lower than " + localHbState.getHeartBeatVersion()); + } } } @@ -777,6 +789,8 @@ public class Gossiper implements IFailur { /* We are here since we have no data for this endpoint locally so request everthing. */ deltaGossipDigestList.add( new GossipDigest(gDigest.getEndpoint(), remoteGeneration, 0) ); + if (logger_.isTraceEnabled()) + logger_.trace("requestAll for " + gDigest.getEndpoint()); } /* Send all the data with version greater than maxRemoteVersion */ Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1066940&r1=1066939&r2=1066940&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Thu Feb 3 20:17:06 2011 @@ -1602,9 +1602,10 @@ public class StorageService implements I calculatePendingRanges(); Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalToken())); + logger_.info("Announcing that I have left the ring for " + RING_DELAY + "ms"); try { - Thread.sleep(2 * Gossiper.intervalInMillis_); + Thread.sleep(RING_DELAY); } catch (InterruptedException e) {