This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0f57457c220 IGNITE-25953 Report to log TimeoutExceptions during safe
time sync (#6389)
0f57457c220 is described below
commit 0f57457c2202b5f73bbd24a2d1cadc619ec88d44
Author: Mikhail Efremov <[email protected]>
AuthorDate: Thu Aug 14 20:55:03 2025 +0600
IGNITE-25953 Report to log TimeoutExceptions during safe time sync (#6389)
---
.../ignite/internal/replicator/ReplicaManager.java | 43 +++++++++++++++++++++-
.../ItAbstractInternalTableScanTest.java | 5 ++-
2 files changed, 45 insertions(+), 3 deletions(-)
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 4910d788894..327f850313c 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -45,6 +45,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
@@ -108,6 +109,7 @@ import
org.apache.ignite.internal.replicator.exception.ExpectedReplicationExcept
import
org.apache.ignite.internal.replicator.exception.ReplicaIsAlreadyStartedException;
import
org.apache.ignite.internal.replicator.exception.ReplicaStoppingException;
import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.AwaitReplicaRequest;
import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
@@ -147,6 +149,10 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
private static final PlacementDriverMessagesFactory
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
+ private static final int MAXIMUM_ATTEMPTS_WITHOUT_LOGGING = 10;
+
+ private final Map<ReplicationGroupId, Integer> timeoutAttemptsCounters =
new ConcurrentHashMap<>();
+
/** Executor for the throttled log. */
// TODO: IGNITE-20063 Maybe get rid of it
private final ThreadPoolExecutor throttledLogExecutor;
@@ -745,6 +751,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
raftGroupServiceFactory
);
+ timeoutAttemptsCounters.put(replicaGrpId, 0);
+
LOG.info("Replica is about to start [replicationGroupId={}].",
replicaGrpId);
Replica newReplica = replicaFactory.apply(raftClient);
@@ -899,6 +907,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
return false;
}
+ timeoutAttemptsCounters.remove(replicaGrpId);
+
try {
// TODO: move into {@method Replica#shutdown}
https://issues.apache.org/jira/browse/IGNITE-22372
raftManager.stopRaftNodes(replicaGrpId);
@@ -1103,12 +1113,21 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
Replica replica = replicaFuture.join();
+ ReplicationGroupId replicaGroupId = replica.groupId();
+
ReplicaSafeTimeSyncRequest req =
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
- .groupId(toReplicationGroupIdMessage(replica.groupId()))
+ .groupId(toReplicationGroupIdMessage(replicaGroupId))
.build();
replica.processRequest(req, localNodeId).whenComplete((res, ex) -> {
if (ex != null) {
+ if (hasCause(ex, TimeoutException.class,
ReplicationTimeoutException.class)) {
+ tryToLogTimeoutFailure(replicaGroupId, ex);
+ } else {
+ // Reset counter if timeouts aren't the reason.
+ timeoutAttemptsCounters.put(replicaGroupId, 0);
+ }
+
if (!hasCause(
ex,
NodeStoppingException.class,
@@ -1124,6 +1143,28 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
});
}
+ private void tryToLogTimeoutFailure(ReplicationGroupId replicaGroupId,
Throwable timeoutException) {
+ Integer currentAttempt =
timeoutAttemptsCounters.computeIfPresent(replicaGroupId, (id, attempts) ->
attempts + 1);
+
+ // In case if for the group id there no entry, thus replica was
stopped and this call in race, then skip logging.
+ if (currentAttempt == null) {
+ return;
+ }
+
+ if (currentAttempt < MAXIMUM_ATTEMPTS_WITHOUT_LOGGING) {
+ return;
+ }
+
+ throttledLog.warn(
+ "SafeTime-Sync-Timeouts", // Common key to throttle among all
replicas and don't spoil the log.
+ "Failed to sync safe time for partition, the same kind of
issue may affect all other replicas on this node "
+ + "[groupId={}, attempt={}].",
+ timeoutException,
+ replicaGroupId,
+ currentAttempt
+ );
+ }
+
private boolean shouldAdvanceIdleSafeTime() {
HybridTimestamp lastProposal = lastIdleSafeTimeProposal;
if (lastProposal == null) {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 7cf1b7a161d..210beb5b5f7 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.distributed;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -274,7 +275,7 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
assertTrue(subscriberFinishedLatch.await(10, TimeUnit.SECONDS),
"count=" + subscriberFinishedLatch.getCount());
- assertEquals(gotException.get().getCause().getClass(),
NoSuchElementException.class);
+ assertEquals(NoSuchElementException.class,
unwrapCause(gotException.get()).getClass());
validateTxAbortedState(tx);
}
@@ -319,7 +320,7 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
assertTrue(gotExceptionLatch.await(10_000, TimeUnit.MILLISECONDS));
- assertEquals(gotException.get().getCause().getClass(),
StorageException.class);
+ assertEquals(StorageException.class,
unwrapCause(gotException.get()).getClass());
validateTxAbortedState(tx);
}