This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit a739e795de01a3fd8b0dea87774a42eb135d81e6 Merge: fefb0b2595 ac05c94e6a Author: David Capwell <[email protected]> AuthorDate: Thu May 22 14:15:55 2025 -0700 Merge branch 'cassandra-5.0' into trunk CHANGES.txt | 1 + .../org/apache/cassandra/gms/EndpointState.java | 97 +++++++++++++++++----- src/java/org/apache/cassandra/gms/Gossiper.java | 21 ++--- .../org/apache/cassandra/gms/HeartBeatState.java | 20 ++--- .../distributed/impl/UnsafeGossipHelper.java | 2 +- .../org/apache/cassandra/gms/GossiperTest.java | 2 +- .../apache/cassandra/gms/SerializationsTest.java | 2 +- 7 files changed, 94 insertions(+), 51 deletions(-) diff --cc CHANGES.txt index 7311a72405,8c64d012f0..0f123645d9 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -254,10 -48,7 +254,11 @@@ Merged from 4.1 * Optionally skip exception logging on invalid legacy protocol magic exception (CASSANDRA-19483) * Fix SimpleClient ability to release acquired capacity (CASSANDRA-20202) * Fix WaitQueue.Signal.awaitUninterruptibly may block forever if invoking thread is interrupted (CASSANDRA-20084) + * Run audit_logging_options through santiation and validation on startup (CASSANDRA-20208) + * Enforce CQL message size limit on multiframe messages (CASSANDRA-20052) + * Fix race condition in DecayingEstimatedHistogramReservoir during rescale (CASSANDRA-19365) Merged from 4.0: ++ * Gossip doesn't converge due to race condition when updating EndpointStates multiple fields (CASSANDRA-20659) * Handle sstable metadata stats file getting a new mtime after compaction has finished (CASSANDRA-18119) * Honor MAX_PARALLEL_TRANSFERS correctly (CASSANDRA-20532) * Updating a column with a new TTL but same expiration time is non-deterministic and causes repair mismatches. (CASSANDRA-20561) diff --cc src/java/org/apache/cassandra/gms/EndpointState.java index c96f99da2e,7955fd664d..26fe33ec69 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@@ -82,13 -94,43 +95,38 @@@ public class EndpointStat @VisibleForTesting public HeartBeatState getHeartBeatState() { - return hbState; + return ref.get().hbState; + } + + public void updateHeartBeat() + { + updateHeartBeat(HeartBeatState::updateHeartBeat); } - void setHeartBeatState(HeartBeatState newHbState) + public void forceNewerGenerationUnsafe() { - updateTimestamp(); - hbState = newHbState; + updateHeartBeat(HeartBeatState::forceNewerGenerationUnsafe); + } + + @VisibleForTesting + public void forceHighestPossibleVersionUnsafe() + { + updateHeartBeat(HeartBeatState::forceHighestPossibleVersionUnsafe); + } + - void unsafeSetEmptyHeartBeatState() - { - updateHeartBeat(ignore -> HeartBeatState.empty()); - } - + private void updateHeartBeat(Function<HeartBeatState, HeartBeatState> fn) + { + HeartBeatState previous = null; + HeartBeatState update = null; + while (true) + { + View view = ref.get(); + if (previous == null || view.hbState != previous) // if this races with updating states then can avoid bumping versions + update = fn.apply(view.hbState); + if (ref.compareAndSet(view, new View(update, view.applicationState))) + return; + previous = view.hbState; + } } public VersionedValue getApplicationState(ApplicationState key) diff --cc src/java/org/apache/cassandra/gms/Gossiper.java index b84b1f25cb,3d4bc95cac..d6aebf1ba8 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@@ -1460,8 -1768,13 +1453,10 @@@ public class Gossiper implements IFailu logger.trace("Updating {} state version to {} for {}", entry.getKey().toString(), entry.getValue().version, addr); } } - localState.addApplicationStates(updatedStates); + localState.addApplicationStates(updatedStates, remoteState.getHeartBeatState()); + if (logger.isTraceEnabled()) + logger.trace("Updating heartbeat state version to {} from {} for {} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion, addr); - - // get rid of legacy fields once the cluster is not in mixed mode - if (!hasMajorVersion3Nodes) - localState.removeMajorVersion3LegacyApplicationStates(); + localState.removeMajorVersion3LegacyApplicationStates(); // need to run STATUS or STATUS_WITH_PORT first to handle BOOT_REPLACE correctly (else won't be a member, so TOKENS won't be processed) for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates) @@@ -1765,9 -2171,42 +1760,9 @@@ public void forceNewerGeneration() { EndpointState epstate = endpointStateMap.get(getBroadcastAddressAndPort()); - epstate.getHeartBeatState().forceNewerGenerationUnsafe(); + epstate.forceNewerGenerationUnsafe(); } - - /** - * Add an endpoint we knew about previously, but whose state is unknown - */ - public void addSavedEndpoint(InetAddressAndPort ep) - { - checkProperThreadForStateMutation(); - if (ep.equals(getBroadcastAddressAndPort())) - { - logger.debug("Attempt to add self as saved endpoint"); - return; - } - - //preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on) - EndpointState epState = endpointStateMap.get(ep); - if (epState != null) - { - logger.debug("not replacing a previous epState for {}, but reusing it: {}", ep, epState); - epState.unsafeSetEmptyHeartBeatState(); - } - else - { - epState = new EndpointState(HeartBeatState.empty()); - logger.info("Adding {} as there was no previous epState; new state is {}", ep, epState); - } - - epState.markDead(); - endpointStateMap.put(ep, epState); - silentlyMarkDead(ep, epState); - if (logger.isTraceEnabled()) - logger.trace("Adding saved endpoint {} {}", ep, epState.getHeartBeatState().getGeneration()); - } - private void addLocalApplicationStateInternal(ApplicationState state, VersionedValue value) { assert taskLock.isHeldByCurrentThread(); diff --cc test/unit/org/apache/cassandra/gms/SerializationsTest.java index c7320b194e,e23fdf4928..28a7714466 --- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java @@@ -131,9 -128,9 +131,9 @@@ public class SerializationsTest extend private static VersionedValue vv0 = vvFact.load(23d); private static VersionedValue vv1 = vvFact.bootstrapping(Collections.<Token>singleton(partitioner.getRandomToken())); private static List<GossipDigest> Digests = new ArrayList<GossipDigest>(); - + static { - HeartbeatSt.updateHeartBeat(); + EndpointSt.updateHeartBeat(); EndpointSt.addApplicationState(ApplicationState.LOAD, vv0); EndpointSt.addApplicationState(ApplicationState.STATUS_WITH_PORT, vv1); for (int i = 0; i < 100; i++) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
