This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new c736d22cf8 Gossip doesn't converge due to race condition when updating 
EndpointStates multiple fields
c736d22cf8 is described below

commit c736d22cf855d0981bb514fce5cb3a149b8c2f43
Author: David Capwell <[email protected]>
AuthorDate: Wed May 21 14:39:45 2025 -0700

    Gossip doesn't converge due to race condition when updating EndpointStates 
multiple fields
    
    patch by David Capwell, Matt Byrd; reviewed by Blake Eggleston, Brandon 
Williams for CASSANDRA-20659
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/gms/EndpointState.java    | 111 ++++++++++++++++-----
 src/java/org/apache/cassandra/gms/Gossiper.java    |  28 ++----
 .../org/apache/cassandra/gms/HeartBeatState.java   |  20 ++--
 .../org/apache/cassandra/gms/GossiperTest.java     |   2 +-
 .../apache/cassandra/gms/SerializationsTest.java   |   2 +-
 6 files changed, 109 insertions(+), 55 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 65e3cb4de0..676522b9bf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.18
+ * Gossip doesn't converge due to race condition when updating EndpointStates 
multiple fields (CASSANDRA-20659)
  * Handle sstable metadata stats file getting a new mtime after compaction has 
finished (CASSANDRA-18119)
  * Honor MAX_PARALLEL_TRANSFERS correctly (CASSANDRA-20532)
  * Updating a column with a new TTL but same expiration time is 
non-deterministic and causes repair mismatches. (CASSANDRA-20561)
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java 
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 782a72207c..6cec0cef34 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -24,6 +24,9 @@ import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import com.google.common.base.Function;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,16 +40,25 @@ import org.apache.cassandra.utils.CassandraVersion;
  * This abstraction represents both the HeartBeatState and the 
ApplicationState in an EndpointState
  * instance. Any state for a given endpoint can be retrieved from this 
instance.
  */
-
-
 public class EndpointState
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(EndpointState.class);
 
     public final static IVersionedSerializer<EndpointState> serializer = new 
EndpointStateSerializer();
 
-    private volatile HeartBeatState hbState;
-    private final AtomicReference<Map<ApplicationState, VersionedValue>> 
applicationState;
+    private static class View
+    {
+        final HeartBeatState hbState;
+        final Map<ApplicationState, VersionedValue> applicationState;
+
+        private View(HeartBeatState hbState, Map<ApplicationState, 
VersionedValue> applicationState)
+        {
+            this.hbState = hbState;
+            this.applicationState = applicationState;
+        }
+    }
+
+    private final AtomicReference<View> ref;
 
     /* fields below do not get serialized */
     private volatile long updateTimestamp;
@@ -54,46 +66,79 @@ public class EndpointState
 
     public EndpointState(HeartBeatState initialHbState)
     {
-        this(initialHbState, new EnumMap<ApplicationState, 
VersionedValue>(ApplicationState.class));
+        this(initialHbState, new EnumMap<>(ApplicationState.class));
     }
 
     public EndpointState(EndpointState other)
     {
-        this(new HeartBeatState(other.hbState), new 
EnumMap<>(other.applicationState.get()));
+        ref = new AtomicReference<>(other.ref.get());
+        updateTimestamp = System.nanoTime();
+        isAlive = true;
     }
 
-    EndpointState(HeartBeatState initialHbState, Map<ApplicationState, 
VersionedValue> states)
+    @VisibleForTesting
+    public EndpointState(HeartBeatState initialHbState, Map<ApplicationState, 
VersionedValue> states)
     {
-        hbState = initialHbState;
-        applicationState = new AtomicReference<Map<ApplicationState, 
VersionedValue>>(new EnumMap<>(states));
+        ref = new AtomicReference<>(new View(initialHbState, new 
EnumMap<>(states)));
         updateTimestamp = System.nanoTime();
         isAlive = true;
     }
 
-    HeartBeatState getHeartBeatState()
+    @VisibleForTesting
+    public HeartBeatState getHeartBeatState()
+    {
+        return ref.get().hbState;
+    }
+
+    public void updateHeartBeat()
+    {
+        updateHeartBeat(HeartBeatState::updateHeartBeat);
+    }
+
+    public void forceNewerGenerationUnsafe()
+    {
+        updateHeartBeat(HeartBeatState::forceNewerGenerationUnsafe);
+    }
+
+    @VisibleForTesting
+    public void forceHighestPossibleVersionUnsafe()
     {
-        return hbState;
+        updateHeartBeat(HeartBeatState::forceHighestPossibleVersionUnsafe);
     }
 
-    void setHeartBeatState(HeartBeatState newHbState)
+    void unsafeSetEmptyHeartBeatState()
     {
-        updateTimestamp();
-        hbState = newHbState;
+        updateHeartBeat(ignore -> HeartBeatState.empty());
+    }
+
+    private void updateHeartBeat(Function<HeartBeatState, HeartBeatState> fn)
+    {
+        HeartBeatState previous = null;
+        HeartBeatState update = null;
+        while (true)
+        {
+            View view = ref.get();
+            if (previous == null || view.hbState != previous) // if this races 
with updating states then can avoid bumping versions
+                update = fn.apply(view.hbState);
+            if (ref.compareAndSet(view, new View(update, 
view.applicationState)))
+                return;
+            previous = view.hbState;
+        }
     }
 
     public VersionedValue getApplicationState(ApplicationState key)
     {
-        return applicationState.get().get(key);
+        return ref.get().applicationState.get(key);
     }
 
     public boolean containsApplicationState(ApplicationState key)
     {
-        return applicationState.get().containsKey(key);
+        return ref.get().applicationState.containsKey(key);
     }
 
     public Set<Map.Entry<ApplicationState, VersionedValue>> states()
     {
-        return applicationState.get().entrySet();
+        return ref.get().applicationState.entrySet();
     }
 
     public void addApplicationState(ApplicationState key, VersionedValue value)
@@ -107,17 +152,27 @@ public class EndpointState
     }
 
     public void addApplicationStates(Set<Map.Entry<ApplicationState, 
VersionedValue>> values)
+    {
+        addApplicationStates(values, null);
+    }
+
+    public void addApplicationStates(Set<Map.Entry<ApplicationState, 
VersionedValue>> values, @Nullable HeartBeatState hbState)
     {
         while (true)
         {
-            Map<ApplicationState, VersionedValue> orig = 
applicationState.get();
+            View view = this.ref.get();
+            Map<ApplicationState, VersionedValue> orig = view.applicationState;
             Map<ApplicationState, VersionedValue> copy = new EnumMap<>(orig);
 
             for (Map.Entry<ApplicationState, VersionedValue> value : values)
                 copy.put(value.getKey(), value.getValue());
 
-            if (applicationState.compareAndSet(orig, copy))
+            if (this.ref.compareAndSet(view, new View(hbState == null ? 
view.hbState : hbState, copy)))
+            {
+                if (hbState != null)
+                    updateTimestamp();
                 return;
+            }
         }
     }
 
@@ -125,18 +180,19 @@ public class EndpointState
     {
         while (hasLegacyFields())
         {
-            Map<ApplicationState, VersionedValue> orig = 
applicationState.get();
+            View view = ref.get();
+            Map<ApplicationState, VersionedValue> orig = view.applicationState;
             Map<ApplicationState, VersionedValue> updatedStates = 
filterMajorVersion3LegacyApplicationStates(orig);
             // avoid updating if no state is removed
             if (orig.size() == updatedStates.size()
-                || applicationState.compareAndSet(orig, updatedStates))
+                || ref.compareAndSet(view, new View(view.hbState, 
updatedStates)))
                 return;
         }
     }
 
     private boolean hasLegacyFields()
     {
-        Set<ApplicationState> statesPresent = applicationState.get().keySet();
+        Set<ApplicationState> statesPresent = 
ref.get().applicationState.keySet();
         if (statesPresent.isEmpty())
             return false;
         return (statesPresent.contains(ApplicationState.STATUS) && 
statesPresent.contains(ApplicationState.STATUS_WITH_PORT))
@@ -193,7 +249,7 @@ public class EndpointState
 
     public boolean isStateEmpty()
     {
-        return applicationState.get().isEmpty();
+        return ref.get().applicationState.isEmpty();
     }
 
     /**
@@ -201,8 +257,10 @@ public class EndpointState
      */
     public boolean isEmptyWithoutStatus()
     {
-        Map<ApplicationState, VersionedValue> state = applicationState.get();
-        return hbState.isEmpty() && 
!(state.containsKey(ApplicationState.STATUS_WITH_PORT) || 
state.containsKey(ApplicationState.STATUS));
+        View view = ref.get();
+        Map<ApplicationState, VersionedValue> state = view.applicationState;
+        boolean hasStatus = 
state.containsKey(ApplicationState.STATUS_WITH_PORT) || 
state.containsKey(ApplicationState.STATUS);
+        return view.hbState.isEmpty() && !hasStatus;
     }
 
     public boolean isRpcReady()
@@ -253,7 +311,8 @@ public class EndpointState
 
     public String toString()
     {
-        return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = 
" + applicationState.get();
+        View view = ref.get();
+        return "EndpointState: HeartBeatState = " + view.hbState + ", 
AppStateMap = " + view.applicationState;
     }
 }
 
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 9a7a9935b8..12c532b162 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -310,7 +310,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                 taskLock.lock();
 
                 /* Update the local heartbeat counter. */
-                
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().updateHeartBeat();
+                
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).updateHeartBeat();
                 if (logger.isTraceEnabled())
                     logger.trace("My heartbeat is now {}", 
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().getHeartBeatVersion());
                 final List<GossipDigest> gDigests = new ArrayList<>();
@@ -598,7 +598,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, 
shutdown);
         epState.addApplicationState(ApplicationState.STATUS, 
StorageService.instance.valueFactory.shutdown(true));
         epState.addApplicationState(ApplicationState.RPC_READY, 
StorageService.instance.valueFactory.rpcReady(false));
-        epState.getHeartBeatState().forceHighestPossibleVersionUnsafe();
+        epState.forceHighestPossibleVersionUnsafe();
         markDead(endpoint, epState);
         FailureDetector.instance.forceConviction(endpoint);
         GossiperDiagnostics.markedAsShutdown(this, endpoint);
@@ -778,7 +778,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         // update the other node's generation to mimic it as if it had changed 
it itself
         logger.info("Advertising removal for {}", endpoint);
         epState.updateTimestamp(); // make sure we don't evict it too soon
-        epState.getHeartBeatState().forceNewerGenerationUnsafe();
+        epState.forceNewerGenerationUnsafe();
         Map<ApplicationState, VersionedValue> states = new 
EnumMap<>(ApplicationState.class);
         states.put(ApplicationState.STATUS_WITH_PORT, 
StorageService.instance.valueFactory.removingNonlocal(hostId));
         states.put(ApplicationState.STATUS, 
StorageService.instance.valueFactory.removingNonlocal(hostId));
@@ -798,7 +798,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     {
         EndpointState epState = endpointStateMap.get(endpoint);
         epState.updateTimestamp(); // make sure we don't evict it too soon
-        epState.getHeartBeatState().forceNewerGenerationUnsafe();
+        epState.forceNewerGenerationUnsafe();
         long expireTime = computeExpireTime();
         epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, 
StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime));
         epState.addApplicationState(ApplicationState.STATUS, 
StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime));
@@ -849,7 +849,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                 else if (newState.getHeartBeatState().getHeartBeatVersion() != 
heartbeat)
                     throw new RuntimeException("Endpoint still alive: " + 
endpoint + " heartbeat changed while trying to assassinate it");
                 epState.updateTimestamp(); // make sure we don't evict it too 
soon
-                epState.getHeartBeatState().forceNewerGenerationUnsafe();
+                epState.forceNewerGenerationUnsafe();
             }
 
             Collection<Token> tokens = null;
@@ -1580,15 +1580,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         // don't assert here, since if the node restarts the version will go 
back to zero
         int oldVersion = localState.getHeartBeatState().getHeartBeatVersion();
 
-        localState.setHeartBeatState(remoteState.getHeartBeatState());
-        if (logger.isTraceEnabled())
-            logger.trace("Updating heartbeat state version to {} from {} for 
{} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion, 
addr);
-
-        Set<Entry<ApplicationState, VersionedValue>> remoteStates = 
remoteState.states();
-        assert remoteState.getHeartBeatState().getGeneration() == 
localState.getHeartBeatState().getGeneration();
-
-
-        Set<Entry<ApplicationState, VersionedValue>> updatedStates = 
remoteStates.stream().filter(entry -> {
+        Set<Entry<ApplicationState, VersionedValue>> updatedStates = 
remoteState.states().stream().filter(entry -> {
             // filter out the states that are already up to date (has the same 
or higher version)
             VersionedValue local = 
localState.getApplicationState(entry.getKey());
             return (local == null || local.version < entry.getValue().version);
@@ -1601,7 +1593,9 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                 logger.trace("Updating {} state version to {} for {}", 
entry.getKey().toString(), entry.getValue().version, addr);
             }
         }
-        localState.addApplicationStates(updatedStates);
+        localState.addApplicationStates(updatedStates, 
remoteState.getHeartBeatState());
+        if (logger.isTraceEnabled())
+            logger.trace("Updating heartbeat state version to {} from {} for 
{} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion, 
addr);
 
         // get rid of legacy fields once the cluster is not in mixed mode
         if (!hasMajorVersion3OrUnknownNodes())
@@ -1983,7 +1977,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     public void forceNewerGeneration()
     {
         EndpointState epstate = 
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
-        epstate.getHeartBeatState().forceNewerGenerationUnsafe();
+        epstate.forceNewerGenerationUnsafe();
     }
 
 
@@ -2004,7 +1998,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         if (epState != null)
         {
             logger.debug("not replacing a previous epState for {}, but reusing 
it: {}", ep, epState);
-            epState.setHeartBeatState(HeartBeatState.empty());
+            epState.unsafeSetEmptyHeartBeatState();
         }
         else
         {
diff --git a/src/java/org/apache/cassandra/gms/HeartBeatState.java 
b/src/java/org/apache/cassandra/gms/HeartBeatState.java
index 75f4f56ea7..374d346a0a 100644
--- a/src/java/org/apache/cassandra/gms/HeartBeatState.java
+++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java
@@ -33,8 +33,8 @@ public class HeartBeatState
 
     public static final IVersionedSerializer<HeartBeatState> serializer = new 
HeartBeatStateSerializer();
 
-    private volatile int generation;
-    private volatile int version;
+    private final int generation;
+    private final int version;
 
     HeartBeatState(int gen)
     {
@@ -67,29 +67,29 @@ public class HeartBeatState
         return version == EMPTY_VERSION;
     }
 
-    int getGeneration()
+    public int getGeneration()
     {
         return generation;
     }
 
-    void updateHeartBeat()
+    HeartBeatState updateHeartBeat()
     {
-        version = VersionGenerator.getNextVersion();
+        return new HeartBeatState(generation, 
VersionGenerator.getNextVersion());
     }
 
-    int getHeartBeatVersion()
+    public int getHeartBeatVersion()
     {
         return version;
     }
 
-    void forceNewerGenerationUnsafe()
+    HeartBeatState forceNewerGenerationUnsafe()
     {
-        generation += 1;
+        return new HeartBeatState(generation + 1, version);
     }
 
-    void forceHighestPossibleVersionUnsafe()
+    HeartBeatState forceHighestPossibleVersionUnsafe()
     {
-        version = Integer.MAX_VALUE;
+        return new HeartBeatState(generation, Integer.MAX_VALUE);
     }
 
     public String toString()
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java 
b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index 96730baa19..778d4a6b34 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -304,7 +304,7 @@ public class GossiperTest
             proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
 
             // Bump the heartbeat version and use the same TOKENS state
-            proposedRemoteHeartBeat.updateHeartBeat();
+            proposedRemoteState.updateHeartBeat();
             proposedRemoteState.addApplicationState(ApplicationState.TOKENS, 
tokensValue);
 
             // The following state change should only update heartbeat without 
updating the TOKENS state
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java 
b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index 90ce10ba0d..6ac4729ae1 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -130,7 +130,7 @@ public class SerializationsTest extends 
AbstractSerializationsTester
         private static List<GossipDigest> Digests = new 
ArrayList<GossipDigest>();
 
         {
-            HeartbeatSt.updateHeartBeat();
+            EndpointSt.updateHeartBeat();
             EndpointSt.addApplicationState(ApplicationState.LOAD, vv0);
             EndpointSt.addApplicationState(ApplicationState.STATUS_WITH_PORT, 
vv1);
             for (int i = 0; i < 100; i++)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to