This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f57457c220 IGNITE-25953 Report to log TimeoutExceptions during safe 
time sync (#6389)
0f57457c220 is described below

commit 0f57457c2202b5f73bbd24a2d1cadc619ec88d44
Author: Mikhail Efremov <[email protected]>
AuthorDate: Thu Aug 14 20:55:03 2025 +0600

    IGNITE-25953 Report to log TimeoutExceptions during safe time sync (#6389)
---
 .../ignite/internal/replicator/ReplicaManager.java | 43 +++++++++++++++++++++-
 .../ItAbstractInternalTableScanTest.java           |  5 ++-
 2 files changed, 45 insertions(+), 3 deletions(-)

diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 4910d788894..327f850313c 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -45,6 +45,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
@@ -108,6 +109,7 @@ import 
org.apache.ignite.internal.replicator.exception.ExpectedReplicationExcept
 import 
org.apache.ignite.internal.replicator.exception.ReplicaIsAlreadyStartedException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicaStoppingException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
+import 
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.replicator.message.AwaitReplicaRequest;
 import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
@@ -147,6 +149,10 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
     private static final PlacementDriverMessagesFactory 
PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
 
+    private static final int MAXIMUM_ATTEMPTS_WITHOUT_LOGGING = 10;
+
+    private final Map<ReplicationGroupId, Integer> timeoutAttemptsCounters = 
new ConcurrentHashMap<>();
+
     /** Executor for the throttled log. */
     // TODO: IGNITE-20063 Maybe get rid of it
     private final ThreadPoolExecutor throttledLogExecutor;
@@ -745,6 +751,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 raftGroupServiceFactory
         );
 
+        timeoutAttemptsCounters.put(replicaGrpId, 0);
+
         LOG.info("Replica is about to start [replicationGroupId={}].", 
replicaGrpId);
 
         Replica newReplica = replicaFactory.apply(raftClient);
@@ -899,6 +907,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                         return false;
                     }
 
+                    timeoutAttemptsCounters.remove(replicaGrpId);
+
                     try {
                         // TODO: move into {@method Replica#shutdown} 
https://issues.apache.org/jira/browse/IGNITE-22372
                         raftManager.stopRaftNodes(replicaGrpId);
@@ -1103,12 +1113,21 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
         Replica replica = replicaFuture.join();
 
+        ReplicationGroupId replicaGroupId = replica.groupId();
+
         ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
-                .groupId(toReplicationGroupIdMessage(replica.groupId()))
+                .groupId(toReplicationGroupIdMessage(replicaGroupId))
                 .build();
 
         replica.processRequest(req, localNodeId).whenComplete((res, ex) -> {
             if (ex != null) {
+                if (hasCause(ex, TimeoutException.class, 
ReplicationTimeoutException.class)) {
+                    tryToLogTimeoutFailure(replicaGroupId, ex);
+                } else {
+                    // Reset counter if timeouts aren't the reason.
+                    timeoutAttemptsCounters.put(replicaGroupId, 0);
+                }
+
                 if (!hasCause(
                         ex,
                         NodeStoppingException.class,
@@ -1124,6 +1143,28 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         });
     }
 
+    private void tryToLogTimeoutFailure(ReplicationGroupId replicaGroupId, 
Throwable timeoutException) {
+        Integer currentAttempt = 
timeoutAttemptsCounters.computeIfPresent(replicaGroupId, (id, attempts) -> 
attempts + 1);
+
+        // In case if for the group id there no entry, thus replica was 
stopped and this call in race, then skip logging.
+        if (currentAttempt == null) {
+            return;
+        }
+
+        if (currentAttempt < MAXIMUM_ATTEMPTS_WITHOUT_LOGGING) {
+            return;
+        }
+
+        throttledLog.warn(
+                "SafeTime-Sync-Timeouts", // Common key to throttle among all 
replicas and don't spoil the log.
+                "Failed to sync safe time for partition, the same kind of 
issue may affect all other replicas on this node "
+                        + "[groupId={}, attempt={}].",
+                timeoutException,
+                replicaGroupId,
+                currentAttempt
+        );
+    }
+
     private boolean shouldAdvanceIdleSafeTime() {
         HybridTimestamp lastProposal = lastIdleSafeTimeProposal;
         if (lastProposal == null) {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 7cf1b7a161d..210beb5b5f7 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.distributed;
 
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -274,7 +275,7 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
 
         assertTrue(subscriberFinishedLatch.await(10, TimeUnit.SECONDS), 
"count=" + subscriberFinishedLatch.getCount());
 
-        assertEquals(gotException.get().getCause().getClass(), 
NoSuchElementException.class);
+        assertEquals(NoSuchElementException.class, 
unwrapCause(gotException.get()).getClass());
 
         validateTxAbortedState(tx);
     }
@@ -319,7 +320,7 @@ public abstract class ItAbstractInternalTableScanTest 
extends IgniteAbstractTest
 
         assertTrue(gotExceptionLatch.await(10_000, TimeUnit.MILLISECONDS));
 
-        assertEquals(gotException.get().getCause().getClass(), 
StorageException.class);
+        assertEquals(StorageException.class, 
unwrapCause(gotException.get()).getClass());
 
         validateTxAbortedState(tx);
     }

Reply via email to