This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 66b973341a Accord: Retry epoch/topology metadata fetch on all peer 
nodes
66b973341a is described below

commit 66b973341a540fe4d325427ea3f706bf524a4d5e
Author: Alex Petrov <[email protected]>
AuthorDate: Tue May 20 12:12:01 2025 +0200

    Accord: Retry epoch/topology metadata fetch on all peer nodes
    
    Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-20663
---
 .../org/apache/cassandra/gms/FailureDetector.java  | 10 ++++-
 .../cassandra/service/accord/AccordService.java    | 51 ++++++++++------------
 .../cassandra/simulator/paxos/PaxosSimulation.java |  5 ++-
 3 files changed, 36 insertions(+), 30 deletions(-)

diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java 
b/src/java/org/apache/cassandra/gms/FailureDetector.java
index 49b2089297..da90efc332 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -326,7 +326,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
             // an error in that case.
             ClusterMetadata metadata = ClusterMetadata.current();
             if (!metadata.directory.allJoinedEndpoints().contains(ep) && 
!metadata.fullCMSMembers().contains(ep))
-                logger.error("Unknown endpoint: " + ep, new 
IllegalArgumentException("Unknown endpoint: " + ep));
+                logger.error("Unknown endpoint: " + ep, new 
UnknownEndpointException(ep));
         }
         return epState != null && epState.isAlive();
     }
@@ -437,6 +437,14 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
         
sb.append("-----------------------------------------------------------------------");
         return sb.toString();
     }
+
+    public static class UnknownEndpointException extends 
IllegalArgumentException
+    {
+        public UnknownEndpointException(InetAddressAndPort ep)
+        {
+            super("Unknown endpoint: " + ep);
+        }
+    }
 }
 
 /*
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index acb4c65e8b..48c92e6296 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -448,35 +448,30 @@ public class AccordService implements IAccordService, 
Shutdownable
         if (peers.isEmpty())
             return null;
 
-        Iterator<InetAddressAndPort> iter = peers.iterator();
-        while (iter.hasNext())
+        try
         {
-            InetAddressAndPort peer = iter.next();
-            try
-            {
-                logger.info("Fetching topologies for epochs [{}, {}] from {}", 
from, metadata.epoch.getEpoch(), peer);
-                Invariants.require(from <= metadata.epoch.getEpoch(),
-                                   "Accord epochs should never be ahead of TCM 
ones, but %d was ahead of %d", from, metadata.epoch.getEpoch());
-
-                Future<TopologyRange> futures = 
FetchTopologies.fetch(SharedContext.Global.instance,
-                                                                      
Collections.singleton(peer),
-                                                                      from,
-                                                                      
Long.MAX_VALUE);
-                TopologyRange response = futures.get();
-                logger.info("Fetched topologies {}", response);
-
-                // We're behind and need to catch up CMS first.
-                if (response.current > 
ClusterMetadata.current().epoch.getEpoch())
-                    
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(response.current));
-
-                if (response.current >= from)
-                    return response;
-                metadata = ClusterMetadata.current();
-            }
-            catch (Throwable e)
-            {
-                logger.info("Failed to fetch epochs [{}, {}] from {}", from, 
metadata.epoch.getEpoch(), peer);
-            }
+            logger.info("Fetching topologies for epochs [{}, {}] from {}", 
from, metadata.epoch.getEpoch(), peers);
+            Invariants.require(from <= metadata.epoch.getEpoch(),
+                               "Accord epochs should never be ahead of TCM 
ones, but %d was ahead of %d", from, metadata.epoch.getEpoch());
+
+            Future<TopologyRange> futures = 
FetchTopologies.fetch(SharedContext.Global.instance,
+                                                                  peers,
+                                                                  from,
+                                                                  
Long.MAX_VALUE);
+            TopologyRange response = futures.get();
+            logger.info("Fetched topologies {}", response);
+
+            // We're behind and need to catch up CMS first.
+            if (response.current > ClusterMetadata.current().epoch.getEpoch())
+                
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(response.current));
+
+            if (response.current >= from)
+                return response;
+            metadata = ClusterMetadata.current();
+        }
+        catch (Throwable e)
+        {
+            logger.info("Failed to fetch epochs [{}, {}] from {}", from, 
metadata.epoch.getEpoch(), peers);
         }
 
         // After trying to contact all peers, and retrying according to retry 
spec on them, we give up.
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
index fbc590c942..e58e691962 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
@@ -35,6 +35,7 @@ import java.util.stream.Stream;
 import javax.annotation.Nullable;
 
 import com.google.common.base.Throwables;
+import org.apache.cassandra.gms.FailureDetector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,7 +86,8 @@ public abstract class PaxosSimulation implements Simulation, 
ClusterActionListen
     protected Class<? extends Throwable>[] expectedExceptionsPaxos()
     {
         return (Class<? extends Throwable>[]) new Class<?>[] { 
RequestExecutionException.class,
-                                                               
CancellationException.class };
+                                                               
CancellationException.class,
+                                                               
FailureDetector.UnknownEndpointException.class};
     }
 
     @SuppressWarnings("unchecked")
@@ -97,6 +99,7 @@ public abstract class PaxosSimulation implements Simulation, 
ClusterActionListen
                                                                
CancellationException.class,
                                                                
CoordinationFailed.class,
                                                                
ClosedChannelException.class,
+                                                               
FailureDetector.UnknownEndpointException.class,
                                                                
StreamReceivedOutOfTokenRangeException.class // should always come in 
combination with closed channel exception
         };
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to