This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 66b973341a Accord: Retry epoch/topology metadata fetch on all peer
nodes
66b973341a is described below
commit 66b973341a540fe4d325427ea3f706bf524a4d5e
Author: Alex Petrov <[email protected]>
AuthorDate: Tue May 20 12:12:01 2025 +0200
Accord: Retry epoch/topology metadata fetch on all peer nodes
Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-20663
---
.../org/apache/cassandra/gms/FailureDetector.java | 10 ++++-
.../cassandra/service/accord/AccordService.java | 51 ++++++++++------------
.../cassandra/simulator/paxos/PaxosSimulation.java | 5 ++-
3 files changed, 36 insertions(+), 30 deletions(-)
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java
b/src/java/org/apache/cassandra/gms/FailureDetector.java
index 49b2089297..da90efc332 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -326,7 +326,7 @@ public class FailureDetector implements IFailureDetector,
FailureDetectorMBean
// an error in that case.
ClusterMetadata metadata = ClusterMetadata.current();
if (!metadata.directory.allJoinedEndpoints().contains(ep) &&
!metadata.fullCMSMembers().contains(ep))
- logger.error("Unknown endpoint: " + ep, new
IllegalArgumentException("Unknown endpoint: " + ep));
+ logger.error("Unknown endpoint: " + ep, new
UnknownEndpointException(ep));
}
return epState != null && epState.isAlive();
}
@@ -437,6 +437,14 @@ public class FailureDetector implements IFailureDetector,
FailureDetectorMBean
sb.append("-----------------------------------------------------------------------");
return sb.toString();
}
+
+ public static class UnknownEndpointException extends
IllegalArgumentException
+ {
+ public UnknownEndpointException(InetAddressAndPort ep)
+ {
+ super("Unknown endpoint: " + ep);
+ }
+ }
}
/*
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index acb4c65e8b..48c92e6296 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -448,35 +448,30 @@ public class AccordService implements IAccordService,
Shutdownable
if (peers.isEmpty())
return null;
- Iterator<InetAddressAndPort> iter = peers.iterator();
- while (iter.hasNext())
+ try
{
- InetAddressAndPort peer = iter.next();
- try
- {
- logger.info("Fetching topologies for epochs [{}, {}] from {}",
from, metadata.epoch.getEpoch(), peer);
- Invariants.require(from <= metadata.epoch.getEpoch(),
- "Accord epochs should never be ahead of TCM
ones, but %d was ahead of %d", from, metadata.epoch.getEpoch());
-
- Future<TopologyRange> futures =
FetchTopologies.fetch(SharedContext.Global.instance,
-
Collections.singleton(peer),
- from,
-
Long.MAX_VALUE);
- TopologyRange response = futures.get();
- logger.info("Fetched topologies {}", response);
-
- // We're behind and need to catch up CMS first.
- if (response.current >
ClusterMetadata.current().epoch.getEpoch())
-
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(response.current));
-
- if (response.current >= from)
- return response;
- metadata = ClusterMetadata.current();
- }
- catch (Throwable e)
- {
- logger.info("Failed to fetch epochs [{}, {}] from {}", from,
metadata.epoch.getEpoch(), peer);
- }
+ logger.info("Fetching topologies for epochs [{}, {}] from {}",
from, metadata.epoch.getEpoch(), peers);
+ Invariants.require(from <= metadata.epoch.getEpoch(),
+ "Accord epochs should never be ahead of TCM
ones, but %d was ahead of %d", from, metadata.epoch.getEpoch());
+
+ Future<TopologyRange> futures =
FetchTopologies.fetch(SharedContext.Global.instance,
+ peers,
+ from,
+
Long.MAX_VALUE);
+ TopologyRange response = futures.get();
+ logger.info("Fetched topologies {}", response);
+
+ // We're behind and need to catch up CMS first.
+ if (response.current > ClusterMetadata.current().epoch.getEpoch())
+
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(response.current));
+
+ if (response.current >= from)
+ return response;
+ metadata = ClusterMetadata.current();
+ }
+ catch (Throwable e)
+ {
+ logger.info("Failed to fetch epochs [{}, {}] from {}", from,
metadata.epoch.getEpoch(), peers);
}
// After trying to contact all peers, and retrying according to retry
spec on them, we give up.
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
index fbc590c942..e58e691962 100644
---
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
+++
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
@@ -35,6 +35,7 @@ import java.util.stream.Stream;
import javax.annotation.Nullable;
import com.google.common.base.Throwables;
+import org.apache.cassandra.gms.FailureDetector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,7 +86,8 @@ public abstract class PaxosSimulation implements Simulation,
ClusterActionListen
protected Class<? extends Throwable>[] expectedExceptionsPaxos()
{
return (Class<? extends Throwable>[]) new Class<?>[] {
RequestExecutionException.class,
-
CancellationException.class };
+
CancellationException.class,
+
FailureDetector.UnknownEndpointException.class};
}
@SuppressWarnings("unchecked")
@@ -97,6 +99,7 @@ public abstract class PaxosSimulation implements Simulation,
ClusterActionListen
CancellationException.class,
CoordinationFailed.class,
ClosedChannelException.class,
+
FailureDetector.UnknownEndpointException.class,
StreamReceivedOutOfTokenRangeException.class // should always come in
combination with closed channel exception
};
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]