Fix duplicate up/down messages sent to native clients Patch by Stefania, reviewed by brandonwilliams for CASSANDRA-7816
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2199a87a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2199a87a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2199a87a Branch: refs/heads/cassandra-2.1 Commit: 2199a87aab8322c41f1b590c0fd8f08f448952ca Parents: 77c66bf Author: Brandon Williams <brandonwilli...@apache.org> Authored: Fri Mar 13 08:02:12 2015 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Fri Mar 13 08:02:12 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/gms/EndpointState.java | 12 ++++++++++ src/java/org/apache/cassandra/gms/Gossiper.java | 25 +++++++++++++++----- 3 files changed, 32 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2199a87a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 382b3dd..8843908 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.14: + * Fix duplicate up/down messages sent to native clients (CASSANDRA-7816) * Expose commit log archive status via JMX (CASSANDRA-8734) * Provide better exceptions for invalid replication strategy parameters (CASSANDRA-8909) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2199a87a/src/java/org/apache/cassandra/gms/EndpointState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index 3df9155..518e575 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -46,12 +46,14 @@ public class EndpointState /* fields below do not get serialized */ private volatile long updateTimestamp; private volatile boolean isAlive; + private volatile boolean hasPendingEcho; EndpointState(HeartBeatState initialHbState) { hbState = initialHbState; updateTimestamp = System.nanoTime(); isAlive = true; + hasPendingEcho = false; } HeartBeatState getHeartBeatState() @@ -113,6 +115,16 @@ public class EndpointState isAlive = false; } + public boolean hasPendingEcho() + { + return hasPendingEcho; + } + + public void markPendingEcho(boolean val) + { + hasPendingEcho = val; + } + public String toString() { return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2199a87a/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index a478405..97dc506 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -29,6 +29,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.cassandra.utils.Pair; @@ -48,8 +49,6 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; -import com.google.common.collect.ImmutableList; - /** * This module is responsible for Gossiping information for the local endpoint. This abstraction * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module @@ -878,6 +877,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return; } + if (localState.hasPendingEcho()) + { + logger.debug("{} has already a pending echo, skipping it", localState); + return; + } + localState.markDead(); MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.ECHO, new EchoMessage(), EchoMessage.serializer); @@ -891,9 +896,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public void response(MessageIn msg) { + localState.markPendingEcho(false); realMarkAlive(addr, localState); } }; + + localState.markPendingEcho(true); MessagingService.instance().sendRR(echoMessage, addr, echoHandler); } @@ -936,9 +944,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean */ private void handleMajorStateChange(InetAddress ep, EndpointState epState) { + EndpointState localEpState = endpointStateMap.get(ep); if (!isDeadState(epState)) { - if (endpointStateMap.get(ep) != null) + if (localEpState != null) logger.info("Node {} has restarted, now UP", ep); else logger.info("Node {} is now part of the cluster", ep); @@ -947,9 +956,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean logger.trace("Adding endpoint state for " + ep); endpointStateMap.put(ep, epState); - // the node restarted: it is up to the subscriber to take whatever action is necessary - for (IEndpointStateChangeSubscriber subscriber : subscribers) - subscriber.onRestart(ep, epState); + if (localEpState != null) + { // the node restarted: it is up to the subscriber to take whatever action is necessary + for (IEndpointStateChangeSubscriber subscriber : subscribers) + subscriber.onRestart(ep, localEpState); + } if (!isDeadState(epState)) markAlive(ep, epState); @@ -994,6 +1005,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean EndpointState localEpStatePtr = endpointStateMap.get(ep); EndpointState remoteState = entry.getValue(); + /* If state does not exist just add it. If it does then add it if the remote generation is greater. If there is a generation tie, attempt to break it by heartbeat version. @@ -1024,6 +1036,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } else if (logger.isTraceEnabled()) logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep); + if (!localEpStatePtr.isAlive() && !isDeadState(localEpStatePtr)) // unless of course, it was dead markAlive(ep, localEpStatePtr); }