Author: brandonwilliams Date: Wed Oct 12 16:17:02 2011 New Revision: 1182457
URL: http://svn.apache.org/viewvc?rev=1182457&view=rev Log: Expire dead gossip states based on time. Patch by Jérémy Sevellec, reviewed by Paul Cannon and brandonwilliams for CASSANDRA-2961 Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/VersionedValue.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1182457&r1=1182456&r2=1182457&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/Gossiper.java Wed Oct 12 16:17:02 2011 @@ -75,7 +75,7 @@ public class Gossiper implements IFailur private static Logger logger = LoggerFactory.getLogger(Gossiper.class); public static final Gossiper instance = new Gossiper(); - private long aVeryLongTime; + public static final long aVeryLongTime = 259200 * 1000; // 3 days private long FatClientTimeout; private Random random = new Random(); private Comparator<InetAddress> inetcomparator = new Comparator<InetAddress>() @@ -107,6 +107,8 @@ public class Gossiper implements IFailur * gossip gets propagated to all nodes */ private Map<InetAddress, Long> justRemovedEndpoints = new ConcurrentHashMap<InetAddress, Long>(); + private Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>(); + // protocol versions of the other nodes in the cluster private final ConcurrentMap<InetAddress, Integer> versions = new NonBlockingHashMap<InetAddress, Integer>(); @@ -174,8 +176,6 @@ public class Gossiper implements IFailur private Gossiper() { - // 3 days - aVeryLongTime = 259200 * 1000; // half of QUARATINE_DELAY, to ensure justRemovedEndpoints has enough leeway to prevent re-gossip FatClientTimeout = (long)(QUARANTINE_DELAY / 2); /* register with the Failure Detector for receiving Failure detector events */ @@ -296,6 +296,7 @@ public class Gossiper implements IFailur { unreachableEndpoints.remove(endpoint); endpointStateMap.remove(endpoint); + expireTimeEndpointMap.remove(endpoint); justRemovedEndpoints.put(endpoint, System.currentTimeMillis()); if (logger.isDebugEnabled()) logger.debug("evicting " + endpoint + " from gossip"); @@ -417,7 +418,7 @@ public class Gossiper implements IFailur EndpointState epState = endpointStateMap.get(endpoint); epState.updateTimestamp(); // make sure we don't evict it too soon epState.getHeartBeatState().forceNewerGenerationUnsafe(); - epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(token)); + epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(token,computeExpireTime())); logger.info("Completing removal of " + endpoint); endpointStateMap.put(endpoint, epState); // ensure at least one gossip round occurs before returning @@ -572,8 +573,14 @@ public class Gossiper implements IFailur evictFromMembership(endpoint); // can get rid of the state immediately } - if ( !epState.isAlive() && (duration > aVeryLongTime) && (!StorageService.instance.getTokenMetadata().isMember(endpoint))) + long expireTime = getExpireTimeForEndpoint(endpoint); + if (!epState.isAlive() && (now > expireTime) + && (!StorageService.instance.getTokenMetadata().isMember(endpoint))) { + if (logger.isDebugEnabled()) + { + logger.debug("time is expiring for endpoint : " + endpoint + " (" + expireTime + ")"); + } evictFromMembership(endpoint); } } @@ -592,6 +599,17 @@ public class Gossiper implements IFailur } } } + + protected long getExpireTimeForEndpoint(InetAddress endpoint) + { + /* default expireTime is aVeryLongTime */ + long expireTime = computeExpireTime(); + if (expireTimeEndpointMap.containsKey(endpoint)) + { + expireTime = expireTimeEndpointMap.get(endpoint); + } + return expireTime; + } public EndpointState getEndpointStateForEndpoint(InetAddress ep) { @@ -719,6 +737,8 @@ public class Gossiper implements IFailur localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime liveEndpoints.add(addr); unreachableEndpoints.remove(addr); + expireTimeEndpointMap.remove(addr); + logger.debug("removing expire time for endpoint : " + addr); logger.info("InetAddress {} is now UP", addr); for (IEndpointStateChangeSubscriber subscriber : subscribers) subscriber.onAlive(addr, localState); @@ -1066,5 +1086,18 @@ public class Gossiper implements IFailur { return getCurrentGenerationNumber(InetAddress.getByName(address)); } + + public void addExpireTimeForEndpoint(InetAddress endpoint, long expireTime) + { + if (logger.isDebugEnabled()) + { + logger.debug("adding expire time for endpoint : " + endpoint + " (" + expireTime + ")"); + } + expireTimeEndpointMap.put(endpoint, expireTime); + } + + public static long computeExpireTime() { + return System.currentTimeMillis() + Gossiper.aVeryLongTime; + } } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/VersionedValue.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1182457&r1=1182456&r2=1182457&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/VersionedValue.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/VersionedValue.java Wed Oct 12 16:17:02 2011 @@ -125,9 +125,10 @@ public class VersionedValue implements C return new VersionedValue(VersionedValue.STATUS_LEAVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); } - public VersionedValue left(Token token) + public VersionedValue left(Token token, long expireTime) { - return new VersionedValue(VersionedValue.STATUS_LEFT + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + return new VersionedValue(VersionedValue.STATUS_LEFT + VersionedValue.DELIMITER + + partitioner.getTokenFactory().toString(token) + VersionedValue.DELIMITER + expireTime); } public VersionedValue moving(Token token) @@ -140,9 +141,10 @@ public class VersionedValue implements C return new VersionedValue(VersionedValue.REMOVING_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); } - public VersionedValue removedNonlocal(Token token) + public VersionedValue removedNonlocal(Token token, long expireTime) { - return new VersionedValue(VersionedValue.REMOVED_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + return new VersionedValue(VersionedValue.REMOVED_TOKEN + VersionedValue.DELIMITER + + partitioner.getTokenFactory().toString(token) + VersionedValue.DELIMITER + expireTime); } public VersionedValue removalCoordinator(Token token) Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java?rev=1182457&r1=1182456&r2=1182457&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java Wed Oct 12 16:17:02 2011 @@ -969,7 +969,7 @@ public class StorageService implements I if (logger_.isDebugEnabled()) logger_.debug("Node " + endpoint + " state left, token " + token); - excise(token, endpoint); + excise(token, endpoint, extractExpireTime(pieces)); } /** @@ -1021,7 +1021,7 @@ public class StorageService implements I if (VersionedValue.REMOVED_TOKEN.equals(state)) { - excise(removeToken, endpoint); + excise(removeToken, endpoint, extractExpireTime(pieces)); } else if (VersionedValue.REMOVING_TOKEN.equals(state)) { @@ -1054,6 +1054,30 @@ public class StorageService implements I SystemTable.removeToken(token); } } + + private void excise(Token token, InetAddress endpoint, long expireTime) + { + addExpireTimeIfFound(endpoint, expireTime); + excise(token, endpoint); + } + + protected void addExpireTimeIfFound(InetAddress endpoint, long expireTime) + { + if (expireTime != 0L) + { + Gossiper.instance.addExpireTimeForEndpoint(endpoint, expireTime); + } + } + + protected long extractExpireTime(String[] pieces) + { + long expireTime = 0L; + if (pieces.length >= 3) + { + expireTime = Long.parseLong(pieces[2]); + } + return expireTime; + } /** * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is: @@ -1978,7 +2002,7 @@ public class StorageService implements I tokenMetadata_.removeEndpoint(FBUtilities.getBroadcastAddress()); calculatePendingRanges(); - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalToken())); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalToken(),Gossiper.computeExpireTime())); logger_.info("Announcing that I have left the ring for " + RING_DELAY + "ms"); try { Modified: cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java?rev=1182457&r1=1182456&r2=1182457&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java (original) +++ cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java Wed Oct 12 16:17:02 2011 @@ -35,6 +35,7 @@ import org.apache.cassandra.Util; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.dht.*; import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.SimpleSnitch; @@ -299,8 +300,10 @@ public class LeaveAndBootstrapTest exten // Now finish node 6 and node 9 leaving, as well as boot1 (after this node 8 is still // leaving and boot2 in progress - ss.onChange(hosts.get(LEAVING[0]), ApplicationState.STATUS, valueFactory.left(endpointTokens.get(LEAVING[0]))); - ss.onChange(hosts.get(LEAVING[2]), ApplicationState.STATUS, valueFactory.left(endpointTokens.get(LEAVING[2]))); + ss.onChange(hosts.get(LEAVING[0]), ApplicationState.STATUS, + valueFactory.left(endpointTokens.get(LEAVING[0]), Gossiper.computeExpireTime())); + ss.onChange(hosts.get(LEAVING[2]), ApplicationState.STATUS, + valueFactory.left(endpointTokens.get(LEAVING[2]), Gossiper.computeExpireTime())); ss.onChange(boot1, ApplicationState.STATUS, valueFactory.normal(keyTokens.get(5))); // adjust precalcuated results. this changes what the epected endpoints are. @@ -506,7 +509,8 @@ public class LeaveAndBootstrapTest exten // node 3 goes through leave and left and then jumps to normal at its new token ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(keyTokens.get(2))); - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.left(keyTokens.get(2))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, + valueFactory.left(keyTokens.get(2), Gossiper.computeExpireTime())); ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(4))); assertTrue(tmd.getBootstrapTokens().isEmpty()); @@ -556,7 +560,8 @@ public class LeaveAndBootstrapTest exten assertTrue(tmd.getBootstrapTokens().isEmpty()); // go to state left - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.left(keyTokens.get(1))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, + valueFactory.left(keyTokens.get(1), Gossiper.computeExpireTime())); assertFalse(tmd.isMember(hosts.get(2))); assertFalse(tmd.isLeaving(hosts.get(2))); @@ -583,7 +588,8 @@ public class LeaveAndBootstrapTest exten Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 7); // node hosts.get(2) goes jumps to left - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.left(endpointTokens.get(2))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, + valueFactory.left(endpointTokens.get(2), Gossiper.computeExpireTime())); assertFalse(tmd.isMember(hosts.get(2))); @@ -595,7 +601,8 @@ public class LeaveAndBootstrapTest exten assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3))); // and then directly to 'left' - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.left(keyTokens.get(1))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, + valueFactory.left(keyTokens.get(1), Gossiper.computeExpireTime())); assertTrue(tmd.getBootstrapTokens().size() == 0); assertFalse(tmd.isMember(hosts.get(2)));