This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit a739e795de01a3fd8b0dea87774a42eb135d81e6
Merge: fefb0b2595 ac05c94e6a
Author: David Capwell <[email protected]>
AuthorDate: Thu May 22 14:15:55 2025 -0700

    Merge branch 'cassandra-5.0' into trunk

 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/gms/EndpointState.java    | 97 +++++++++++++++++-----
 src/java/org/apache/cassandra/gms/Gossiper.java    | 21 ++---
 .../org/apache/cassandra/gms/HeartBeatState.java   | 20 ++---
 .../distributed/impl/UnsafeGossipHelper.java       |  2 +-
 .../org/apache/cassandra/gms/GossiperTest.java     |  2 +-
 .../apache/cassandra/gms/SerializationsTest.java   |  2 +-
 7 files changed, 94 insertions(+), 51 deletions(-)

diff --cc CHANGES.txt
index 7311a72405,8c64d012f0..0f123645d9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -254,10 -48,7 +254,11 @@@ Merged from 4.1
   * Optionally skip exception logging on invalid legacy protocol magic 
exception (CASSANDRA-19483)
   * Fix SimpleClient ability to release acquired capacity (CASSANDRA-20202)
   * Fix WaitQueue.Signal.awaitUninterruptibly may block forever if invoking 
thread is interrupted (CASSANDRA-20084)
 + * Run audit_logging_options through santiation and validation on startup 
(CASSANDRA-20208)
 + * Enforce CQL message size limit on multiframe messages (CASSANDRA-20052)
 + * Fix race condition in DecayingEstimatedHistogramReservoir during rescale 
(CASSANDRA-19365)
  Merged from 4.0:
++ * Gossip doesn't converge due to race condition when updating EndpointStates 
multiple fields (CASSANDRA-20659)
   * Handle sstable metadata stats file getting a new mtime after compaction 
has finished (CASSANDRA-18119)
   * Honor MAX_PARALLEL_TRANSFERS correctly (CASSANDRA-20532)
   * Updating a column with a new TTL but same expiration time is 
non-deterministic and causes repair mismatches. (CASSANDRA-20561)
diff --cc src/java/org/apache/cassandra/gms/EndpointState.java
index c96f99da2e,7955fd664d..26fe33ec69
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@@ -82,13 -94,43 +95,38 @@@ public class EndpointStat
      @VisibleForTesting
      public HeartBeatState getHeartBeatState()
      {
-         return hbState;
+         return ref.get().hbState;
+     }
+ 
+     public void updateHeartBeat()
+     {
+         updateHeartBeat(HeartBeatState::updateHeartBeat);
      }
  
-     void setHeartBeatState(HeartBeatState newHbState)
+     public void forceNewerGenerationUnsafe()
      {
-         updateTimestamp();
-         hbState = newHbState;
+         updateHeartBeat(HeartBeatState::forceNewerGenerationUnsafe);
+     }
+ 
+     @VisibleForTesting
+     public void forceHighestPossibleVersionUnsafe()
+     {
+         updateHeartBeat(HeartBeatState::forceHighestPossibleVersionUnsafe);
+     }
+ 
 -    void unsafeSetEmptyHeartBeatState()
 -    {
 -        updateHeartBeat(ignore -> HeartBeatState.empty());
 -    }
 -
+     private void updateHeartBeat(Function<HeartBeatState, HeartBeatState> fn)
+     {
+         HeartBeatState previous = null;
+         HeartBeatState update = null;
+         while (true)
+         {
+             View view = ref.get();
+             if (previous == null || view.hbState != previous) // if this 
races with updating states then can avoid bumping versions
+                 update = fn.apply(view.hbState);
+             if (ref.compareAndSet(view, new View(update, 
view.applicationState)))
+                 return;
+             previous = view.hbState;
+         }
      }
  
      public VersionedValue getApplicationState(ApplicationState key)
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index b84b1f25cb,3d4bc95cac..d6aebf1ba8
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -1460,8 -1768,13 +1453,10 @@@ public class Gossiper implements IFailu
                  logger.trace("Updating {} state version to {} for {}", 
entry.getKey().toString(), entry.getValue().version, addr);
              }
          }
-         localState.addApplicationStates(updatedStates);
+         localState.addApplicationStates(updatedStates, 
remoteState.getHeartBeatState());
+         if (logger.isTraceEnabled())
+             logger.trace("Updating heartbeat state version to {} from {} for 
{} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion, 
addr);
 -
 -        // get rid of legacy fields once the cluster is not in mixed mode
 -        if (!hasMajorVersion3Nodes)
 -            localState.removeMajorVersion3LegacyApplicationStates();
 +        localState.removeMajorVersion3LegacyApplicationStates();
  
          // need to run STATUS or STATUS_WITH_PORT first to handle 
BOOT_REPLACE correctly (else won't be a member, so TOKENS won't be processed)
          for (Entry<ApplicationState, VersionedValue> updatedEntry : 
updatedStates)
@@@ -1765,9 -2171,42 +1760,9 @@@
      public void forceNewerGeneration()
      {
          EndpointState epstate = 
endpointStateMap.get(getBroadcastAddressAndPort());
-         epstate.getHeartBeatState().forceNewerGenerationUnsafe();
+         epstate.forceNewerGenerationUnsafe();
      }
  
 -
 -    /**
 -     * Add an endpoint we knew about previously, but whose state is unknown
 -     */
 -    public void addSavedEndpoint(InetAddressAndPort ep)
 -    {
 -        checkProperThreadForStateMutation();
 -        if (ep.equals(getBroadcastAddressAndPort()))
 -        {
 -            logger.debug("Attempt to add self as saved endpoint");
 -            return;
 -        }
 -
 -        //preserve any previously known, in-memory data about the endpoint 
(such as DC, RACK, and so on)
 -        EndpointState epState = endpointStateMap.get(ep);
 -        if (epState != null)
 -        {
 -            logger.debug("not replacing a previous epState for {}, but 
reusing it: {}", ep, epState);
 -            epState.unsafeSetEmptyHeartBeatState();
 -        }
 -        else
 -        {
 -            epState = new EndpointState(HeartBeatState.empty());
 -            logger.info("Adding {} as there was no previous epState; new 
state is {}", ep, epState);
 -        }
 -
 -        epState.markDead();
 -        endpointStateMap.put(ep, epState);
 -        silentlyMarkDead(ep, epState);
 -        if (logger.isTraceEnabled())
 -            logger.trace("Adding saved endpoint {} {}", ep, 
epState.getHeartBeatState().getGeneration());
 -    }
 -
      private void addLocalApplicationStateInternal(ApplicationState state, 
VersionedValue value)
      {
          assert taskLock.isHeldByCurrentThread();
diff --cc test/unit/org/apache/cassandra/gms/SerializationsTest.java
index c7320b194e,e23fdf4928..28a7714466
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@@ -131,9 -128,9 +131,9 @@@ public class SerializationsTest extend
          private static VersionedValue vv0 = vvFact.load(23d);
          private static VersionedValue vv1 = 
vvFact.bootstrapping(Collections.<Token>singleton(partitioner.getRandomToken()));
          private static List<GossipDigest> Digests = new 
ArrayList<GossipDigest>();
 -
 +        static
          {
-             HeartbeatSt.updateHeartBeat();
+             EndpointSt.updateHeartBeat();
              EndpointSt.addApplicationState(ApplicationState.LOAD, vv0);
              EndpointSt.addApplicationState(ApplicationState.STATUS_WITH_PORT, 
vv1);
              for (int i = 0; i < 100; i++)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to