Updated Branches: refs/heads/cassandra-1.2 81619fe9c -> a6ca5d496
Revert #5665 (b7e13b89c265c28acfb624a984b97a06a837c3ea) due to tests failures Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2d90eb65 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2d90eb65 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2d90eb65 Branch: refs/heads/cassandra-1.2 Commit: 2d90eb65bffd5787ff77403a4f3bc05605cfcd5a Parents: 81619fe Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Jun 25 09:56:14 2013 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Jun 25 09:56:14 2013 +0200 ---------------------------------------------------------------------- src/java/org/apache/cassandra/gms/Gossiper.java | 46 +++++++++----------- 1 file changed, 21 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d90eb65/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index b629824..efa9865 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -871,7 +871,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean if (logger.isTraceEnabled()) logger.trace("Updating heartbeat state generation to " + remoteGeneration + " from " + localGeneration + " for " + ep); // major state change will handle the update by inserting the remote state directly - copyNewerApplicationStates(remoteState, localEpStatePtr); handleMajorStateChange(ep, remoteState); } else if ( remoteGeneration == localGeneration ) // generation has not changed, apply new states @@ -881,18 +880,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean int remoteMaxVersion = getMaxEndpointStateVersion(remoteState); if ( remoteMaxVersion > localMaxVersion ) { - if (logger.isTraceEnabled()) - { - logger.trace("Updating heartbeat state version to " + remoteState.getHeartBeatState().getHeartBeatVersion() + - " from " + localEpStatePtr.getHeartBeatState().getHeartBeatVersion() + " for " + ep); - } - localEpStatePtr.setHeartBeatState(remoteState.getHeartBeatState()); - Map<ApplicationState, VersionedValue> merged = copyNewerApplicationStates(localEpStatePtr, remoteState); - for (Entry<ApplicationState, VersionedValue> appState : merged.entrySet()) - doNotifications(ep, appState.getKey(), appState.getValue()); + // apply states, but do not notify since there is no major change + applyNewStates(ep, localEpStatePtr, remoteState); } else if (logger.isTraceEnabled()) - logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep); + logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep); if (!localEpStatePtr.isAlive() && !isDeadState(localEpStatePtr)) // unless of course, it was dead markAlive(ep, localEpStatePtr); } @@ -911,24 +903,28 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } - private Map<ApplicationState, VersionedValue> copyNewerApplicationStates(EndpointState toState, EndpointState fromState) + private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState) { - Map<ApplicationState, VersionedValue> merged = new HashMap<ApplicationState, VersionedValue>(); - for (Entry<ApplicationState, VersionedValue> fromEntry : fromState.getApplicationStateMap().entrySet()) + // don't assert here, since if the node restarts the version will go back to zero + int oldVersion = localState.getHeartBeatState().getHeartBeatVersion(); + + localState.setHeartBeatState(remoteState.getHeartBeatState()); + if (logger.isTraceEnabled()) + logger.trace("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ..."); + + // we need to make two loops here, one to apply, then another to notify, this way all states in an update are present and current when the notifications are received + for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet()) { - ApplicationState key = fromEntry.getKey(); - VersionedValue value = fromEntry.getValue(); + ApplicationState remoteKey = remoteEntry.getKey(); + VersionedValue remoteValue = remoteEntry.getValue(); - if ( (toState.applicationState.containsKey(key) && toState.applicationState.get(key).compareTo(value) < 0) - || !toState.applicationState.containsKey(key) ) - { - if (logger.isTraceEnabled()) - logger.trace("merging {}:{} into ApplicationState", key, value); - toState.addApplicationState(key, value); - merged.put(key, value); - } + assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration(); + localState.addApplicationState(remoteKey, remoteValue); + } + for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet()) + { + doNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue()); } - return merged; } // notify that an application state has changed