This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new c736d22cf8 Gossip doesn't converge due to race condition when updating
EndpointStates multiple fields
c736d22cf8 is described below
commit c736d22cf855d0981bb514fce5cb3a149b8c2f43
Author: David Capwell <[email protected]>
AuthorDate: Wed May 21 14:39:45 2025 -0700
Gossip doesn't converge due to race condition when updating EndpointStates
multiple fields
patch by David Capwell, Matt Byrd; reviewed by Blake Eggleston, Brandon
Williams for CASSANDRA-20659
---
CHANGES.txt | 1 +
.../org/apache/cassandra/gms/EndpointState.java | 111 ++++++++++++++++-----
src/java/org/apache/cassandra/gms/Gossiper.java | 28 ++----
.../org/apache/cassandra/gms/HeartBeatState.java | 20 ++--
.../org/apache/cassandra/gms/GossiperTest.java | 2 +-
.../apache/cassandra/gms/SerializationsTest.java | 2 +-
6 files changed, 109 insertions(+), 55 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 65e3cb4de0..676522b9bf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.18
+ * Gossip doesn't converge due to race condition when updating EndpointStates
multiple fields (CASSANDRA-20659)
* Handle sstable metadata stats file getting a new mtime after compaction has
finished (CASSANDRA-18119)
* Honor MAX_PARALLEL_TRANSFERS correctly (CASSANDRA-20532)
* Updating a column with a new TTL but same expiration time is
non-deterministic and causes repair mismatches. (CASSANDRA-20561)
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 782a72207c..6cec0cef34 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -24,6 +24,9 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import com.google.common.annotations.VisibleForTesting;
+
+import com.google.common.base.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,16 +40,25 @@ import org.apache.cassandra.utils.CassandraVersion;
* This abstraction represents both the HeartBeatState and the
ApplicationState in an EndpointState
* instance. Any state for a given endpoint can be retrieved from this
instance.
*/
-
-
public class EndpointState
{
protected static final Logger logger =
LoggerFactory.getLogger(EndpointState.class);
public final static IVersionedSerializer<EndpointState> serializer = new
EndpointStateSerializer();
- private volatile HeartBeatState hbState;
- private final AtomicReference<Map<ApplicationState, VersionedValue>>
applicationState;
+ private static class View
+ {
+ final HeartBeatState hbState;
+ final Map<ApplicationState, VersionedValue> applicationState;
+
+ private View(HeartBeatState hbState, Map<ApplicationState,
VersionedValue> applicationState)
+ {
+ this.hbState = hbState;
+ this.applicationState = applicationState;
+ }
+ }
+
+ private final AtomicReference<View> ref;
/* fields below do not get serialized */
private volatile long updateTimestamp;
@@ -54,46 +66,79 @@ public class EndpointState
public EndpointState(HeartBeatState initialHbState)
{
- this(initialHbState, new EnumMap<ApplicationState,
VersionedValue>(ApplicationState.class));
+ this(initialHbState, new EnumMap<>(ApplicationState.class));
}
public EndpointState(EndpointState other)
{
- this(new HeartBeatState(other.hbState), new
EnumMap<>(other.applicationState.get()));
+ ref = new AtomicReference<>(other.ref.get());
+ updateTimestamp = System.nanoTime();
+ isAlive = true;
}
- EndpointState(HeartBeatState initialHbState, Map<ApplicationState,
VersionedValue> states)
+ @VisibleForTesting
+ public EndpointState(HeartBeatState initialHbState, Map<ApplicationState,
VersionedValue> states)
{
- hbState = initialHbState;
- applicationState = new AtomicReference<Map<ApplicationState,
VersionedValue>>(new EnumMap<>(states));
+ ref = new AtomicReference<>(new View(initialHbState, new
EnumMap<>(states)));
updateTimestamp = System.nanoTime();
isAlive = true;
}
- HeartBeatState getHeartBeatState()
+ @VisibleForTesting
+ public HeartBeatState getHeartBeatState()
+ {
+ return ref.get().hbState;
+ }
+
+ public void updateHeartBeat()
+ {
+ updateHeartBeat(HeartBeatState::updateHeartBeat);
+ }
+
+ public void forceNewerGenerationUnsafe()
+ {
+ updateHeartBeat(HeartBeatState::forceNewerGenerationUnsafe);
+ }
+
+ @VisibleForTesting
+ public void forceHighestPossibleVersionUnsafe()
{
- return hbState;
+ updateHeartBeat(HeartBeatState::forceHighestPossibleVersionUnsafe);
}
- void setHeartBeatState(HeartBeatState newHbState)
+ void unsafeSetEmptyHeartBeatState()
{
- updateTimestamp();
- hbState = newHbState;
+ updateHeartBeat(ignore -> HeartBeatState.empty());
+ }
+
+ private void updateHeartBeat(Function<HeartBeatState, HeartBeatState> fn)
+ {
+ HeartBeatState previous = null;
+ HeartBeatState update = null;
+ while (true)
+ {
+ View view = ref.get();
+ if (previous == null || view.hbState != previous) // if this races
with updating states then can avoid bumping versions
+ update = fn.apply(view.hbState);
+ if (ref.compareAndSet(view, new View(update,
view.applicationState)))
+ return;
+ previous = view.hbState;
+ }
}
public VersionedValue getApplicationState(ApplicationState key)
{
- return applicationState.get().get(key);
+ return ref.get().applicationState.get(key);
}
public boolean containsApplicationState(ApplicationState key)
{
- return applicationState.get().containsKey(key);
+ return ref.get().applicationState.containsKey(key);
}
public Set<Map.Entry<ApplicationState, VersionedValue>> states()
{
- return applicationState.get().entrySet();
+ return ref.get().applicationState.entrySet();
}
public void addApplicationState(ApplicationState key, VersionedValue value)
@@ -107,17 +152,27 @@ public class EndpointState
}
public void addApplicationStates(Set<Map.Entry<ApplicationState,
VersionedValue>> values)
+ {
+ addApplicationStates(values, null);
+ }
+
+ public void addApplicationStates(Set<Map.Entry<ApplicationState,
VersionedValue>> values, @Nullable HeartBeatState hbState)
{
while (true)
{
- Map<ApplicationState, VersionedValue> orig =
applicationState.get();
+ View view = this.ref.get();
+ Map<ApplicationState, VersionedValue> orig = view.applicationState;
Map<ApplicationState, VersionedValue> copy = new EnumMap<>(orig);
for (Map.Entry<ApplicationState, VersionedValue> value : values)
copy.put(value.getKey(), value.getValue());
- if (applicationState.compareAndSet(orig, copy))
+ if (this.ref.compareAndSet(view, new View(hbState == null ?
view.hbState : hbState, copy)))
+ {
+ if (hbState != null)
+ updateTimestamp();
return;
+ }
}
}
@@ -125,18 +180,19 @@ public class EndpointState
{
while (hasLegacyFields())
{
- Map<ApplicationState, VersionedValue> orig =
applicationState.get();
+ View view = ref.get();
+ Map<ApplicationState, VersionedValue> orig = view.applicationState;
Map<ApplicationState, VersionedValue> updatedStates =
filterMajorVersion3LegacyApplicationStates(orig);
// avoid updating if no state is removed
if (orig.size() == updatedStates.size()
- || applicationState.compareAndSet(orig, updatedStates))
+ || ref.compareAndSet(view, new View(view.hbState,
updatedStates)))
return;
}
}
private boolean hasLegacyFields()
{
- Set<ApplicationState> statesPresent = applicationState.get().keySet();
+ Set<ApplicationState> statesPresent =
ref.get().applicationState.keySet();
if (statesPresent.isEmpty())
return false;
return (statesPresent.contains(ApplicationState.STATUS) &&
statesPresent.contains(ApplicationState.STATUS_WITH_PORT))
@@ -193,7 +249,7 @@ public class EndpointState
public boolean isStateEmpty()
{
- return applicationState.get().isEmpty();
+ return ref.get().applicationState.isEmpty();
}
/**
@@ -201,8 +257,10 @@ public class EndpointState
*/
public boolean isEmptyWithoutStatus()
{
- Map<ApplicationState, VersionedValue> state = applicationState.get();
- return hbState.isEmpty() &&
!(state.containsKey(ApplicationState.STATUS_WITH_PORT) ||
state.containsKey(ApplicationState.STATUS));
+ View view = ref.get();
+ Map<ApplicationState, VersionedValue> state = view.applicationState;
+ boolean hasStatus =
state.containsKey(ApplicationState.STATUS_WITH_PORT) ||
state.containsKey(ApplicationState.STATUS);
+ return view.hbState.isEmpty() && !hasStatus;
}
public boolean isRpcReady()
@@ -253,7 +311,8 @@ public class EndpointState
public String toString()
{
- return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap =
" + applicationState.get();
+ View view = ref.get();
+ return "EndpointState: HeartBeatState = " + view.hbState + ",
AppStateMap = " + view.applicationState;
}
}
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 9a7a9935b8..12c532b162 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -310,7 +310,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
taskLock.lock();
/* Update the local heartbeat counter. */
-
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().updateHeartBeat();
+
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).updateHeartBeat();
if (logger.isTraceEnabled())
logger.trace("My heartbeat is now {}",
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().getHeartBeatVersion());
final List<GossipDigest> gDigests = new ArrayList<>();
@@ -598,7 +598,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
epState.addApplicationState(ApplicationState.STATUS_WITH_PORT,
shutdown);
epState.addApplicationState(ApplicationState.STATUS,
StorageService.instance.valueFactory.shutdown(true));
epState.addApplicationState(ApplicationState.RPC_READY,
StorageService.instance.valueFactory.rpcReady(false));
- epState.getHeartBeatState().forceHighestPossibleVersionUnsafe();
+ epState.forceHighestPossibleVersionUnsafe();
markDead(endpoint, epState);
FailureDetector.instance.forceConviction(endpoint);
GossiperDiagnostics.markedAsShutdown(this, endpoint);
@@ -778,7 +778,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
// update the other node's generation to mimic it as if it had changed
it itself
logger.info("Advertising removal for {}", endpoint);
epState.updateTimestamp(); // make sure we don't evict it too soon
- epState.getHeartBeatState().forceNewerGenerationUnsafe();
+ epState.forceNewerGenerationUnsafe();
Map<ApplicationState, VersionedValue> states = new
EnumMap<>(ApplicationState.class);
states.put(ApplicationState.STATUS_WITH_PORT,
StorageService.instance.valueFactory.removingNonlocal(hostId));
states.put(ApplicationState.STATUS,
StorageService.instance.valueFactory.removingNonlocal(hostId));
@@ -798,7 +798,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
{
EndpointState epState = endpointStateMap.get(endpoint);
epState.updateTimestamp(); // make sure we don't evict it too soon
- epState.getHeartBeatState().forceNewerGenerationUnsafe();
+ epState.forceNewerGenerationUnsafe();
long expireTime = computeExpireTime();
epState.addApplicationState(ApplicationState.STATUS_WITH_PORT,
StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime));
epState.addApplicationState(ApplicationState.STATUS,
StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime));
@@ -849,7 +849,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
else if (newState.getHeartBeatState().getHeartBeatVersion() !=
heartbeat)
throw new RuntimeException("Endpoint still alive: " +
endpoint + " heartbeat changed while trying to assassinate it");
epState.updateTimestamp(); // make sure we don't evict it too
soon
- epState.getHeartBeatState().forceNewerGenerationUnsafe();
+ epState.forceNewerGenerationUnsafe();
}
Collection<Token> tokens = null;
@@ -1580,15 +1580,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
// don't assert here, since if the node restarts the version will go
back to zero
int oldVersion = localState.getHeartBeatState().getHeartBeatVersion();
- localState.setHeartBeatState(remoteState.getHeartBeatState());
- if (logger.isTraceEnabled())
- logger.trace("Updating heartbeat state version to {} from {} for
{} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion,
addr);
-
- Set<Entry<ApplicationState, VersionedValue>> remoteStates =
remoteState.states();
- assert remoteState.getHeartBeatState().getGeneration() ==
localState.getHeartBeatState().getGeneration();
-
-
- Set<Entry<ApplicationState, VersionedValue>> updatedStates =
remoteStates.stream().filter(entry -> {
+ Set<Entry<ApplicationState, VersionedValue>> updatedStates =
remoteState.states().stream().filter(entry -> {
// filter out the states that are already up to date (has the same
or higher version)
VersionedValue local =
localState.getApplicationState(entry.getKey());
return (local == null || local.version < entry.getValue().version);
@@ -1601,7 +1593,9 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
logger.trace("Updating {} state version to {} for {}",
entry.getKey().toString(), entry.getValue().version, addr);
}
}
- localState.addApplicationStates(updatedStates);
+ localState.addApplicationStates(updatedStates,
remoteState.getHeartBeatState());
+ if (logger.isTraceEnabled())
+ logger.trace("Updating heartbeat state version to {} from {} for
{} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion,
addr);
// get rid of legacy fields once the cluster is not in mixed mode
if (!hasMajorVersion3OrUnknownNodes())
@@ -1983,7 +1977,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
public void forceNewerGeneration()
{
EndpointState epstate =
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
- epstate.getHeartBeatState().forceNewerGenerationUnsafe();
+ epstate.forceNewerGenerationUnsafe();
}
@@ -2004,7 +1998,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
if (epState != null)
{
logger.debug("not replacing a previous epState for {}, but reusing
it: {}", ep, epState);
- epState.setHeartBeatState(HeartBeatState.empty());
+ epState.unsafeSetEmptyHeartBeatState();
}
else
{
diff --git a/src/java/org/apache/cassandra/gms/HeartBeatState.java
b/src/java/org/apache/cassandra/gms/HeartBeatState.java
index 75f4f56ea7..374d346a0a 100644
--- a/src/java/org/apache/cassandra/gms/HeartBeatState.java
+++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java
@@ -33,8 +33,8 @@ public class HeartBeatState
public static final IVersionedSerializer<HeartBeatState> serializer = new
HeartBeatStateSerializer();
- private volatile int generation;
- private volatile int version;
+ private final int generation;
+ private final int version;
HeartBeatState(int gen)
{
@@ -67,29 +67,29 @@ public class HeartBeatState
return version == EMPTY_VERSION;
}
- int getGeneration()
+ public int getGeneration()
{
return generation;
}
- void updateHeartBeat()
+ HeartBeatState updateHeartBeat()
{
- version = VersionGenerator.getNextVersion();
+ return new HeartBeatState(generation,
VersionGenerator.getNextVersion());
}
- int getHeartBeatVersion()
+ public int getHeartBeatVersion()
{
return version;
}
- void forceNewerGenerationUnsafe()
+ HeartBeatState forceNewerGenerationUnsafe()
{
- generation += 1;
+ return new HeartBeatState(generation + 1, version);
}
- void forceHighestPossibleVersionUnsafe()
+ HeartBeatState forceHighestPossibleVersionUnsafe()
{
- version = Integer.MAX_VALUE;
+ return new HeartBeatState(generation, Integer.MAX_VALUE);
}
public String toString()
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java
b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index 96730baa19..778d4a6b34 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -304,7 +304,7 @@ public class GossiperTest
proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
// Bump the heartbeat version and use the same TOKENS state
- proposedRemoteHeartBeat.updateHeartBeat();
+ proposedRemoteState.updateHeartBeat();
proposedRemoteState.addApplicationState(ApplicationState.TOKENS,
tokensValue);
// The following state change should only update heartbeat without
updating the TOKENS state
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index 90ce10ba0d..6ac4729ae1 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -130,7 +130,7 @@ public class SerializationsTest extends
AbstractSerializationsTester
private static List<GossipDigest> Digests = new
ArrayList<GossipDigest>();
{
- HeartbeatSt.updateHeartBeat();
+ EndpointSt.updateHeartBeat();
EndpointSt.addApplicationState(ApplicationState.LOAD, vv0);
EndpointSt.addApplicationState(ApplicationState.STATUS_WITH_PORT,
vv1);
for (int i = 0; i < 100; i++)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]