This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5be587bb5fc IGNITE-28248 Fix txn cleanup retry sent on stale node 
(#7843)
5be587bb5fc is described below

commit 5be587bb5fc03243b6f8f93815296d3cadeaa3d2
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Mar 26 21:18:49 2026 +0200

    IGNITE-28248 Fix txn cleanup retry sent on stale node (#7843)
---
 .../handlers/TxCleanupRecoveryRequestHandler.java     |  7 ++++++-
 .../internal/tx/impl/TransactionStateResolver.java    |  6 +++---
 .../internal/tx/impl/TxCleanupRequestSender.java      | 19 +++++++++++++++++--
 .../apache/ignite/internal/tx/impl/TxManagerImpl.java |  3 ++-
 .../org/apache/ignite/internal/tx/TxCleanupTest.java  |  3 ++-
 5 files changed, 30 insertions(+), 8 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java
index 9a4900081be..9fd5ddeedbd 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.partition.replicator.handlers;
 
+import static org.apache.ignite.internal.logger.Loggers.toThrottledLogger;
 import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
 import static org.apache.ignite.internal.tx.TxState.COMMITTED;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
@@ -33,6 +34,7 @@ import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.IgniteThrottledLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.tx.TxManager;
@@ -50,6 +52,8 @@ public class TxCleanupRecoveryRequestHandler {
     private static final IgniteLogger LOG = 
Loggers.forClass(TxCleanupRecoveryRequestHandler.class);
     private static final int THROTTLE_BATCH_SIZE = 1000;
 
+    private final IgniteThrottledLogger throttledLog = toThrottledLogger(LOG);
+
     private final TxStatePartitionStorage txStatePartitionStorage;
     private final TxManager txManager;
     private final FailureProcessor failureProcessor;
@@ -152,7 +156,8 @@ public class TxCleanupRecoveryRequestHandler {
                 txMeta.commitTimestamp(),
                 txId
         ).exceptionally(throwable -> {
-            LOG.warn(
+            throttledLog.warn(
+                    "Failed to cleanup transaction",
                     "Failed to cleanup transaction {}.",
                     throwable,
                     formatTxInfo(txId, txManager)
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
index 541582d5ca4..6e68def97c8 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
@@ -66,14 +66,14 @@ import org.jetbrains.annotations.Nullable;
 public class TransactionStateResolver {
     private static final IgniteLogger LOG = 
Loggers.forClass(TransactionStateResolver.class);
 
-    private final IgniteThrottledLogger throttledLogger;
-
     /** Tx messages factory. */
     private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
 
     /** Replica messages factory. */
     private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
 
+    private final IgniteThrottledLogger throttledLogger;
+
     // TODO https://issues.apache.org/jira/browse/IGNITE-20408 after this 
ticket this resolver will be no longer needed, as
     //  we will store coordinator as ClusterNode in local tx state map.
     /** Function that resolves a node consistent ID to a cluster node. */
@@ -145,7 +145,7 @@ public class TransactionStateResolver {
                         .whenComplete((txStateMeta, e) -> {
                             if (e != null) {
                                 Throwable cause = unwrapCause(e);
-                                throttledLogger.info(cause.getMessage());
+                                throttledLogger.info("Failed to abort the 
transaction on coordinator", cause.getMessage());
 
                                 // Will cause fallback to commit partition 
path.
                                 txStateMeta = 
TxStateMeta.builder(UNKNOWN).build();
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index b132d5851c9..fa17ba72e74 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -46,6 +46,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteThrottledLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.tx.PartitionEnlistment;
@@ -69,6 +71,8 @@ public class TxCleanupRequestSender {
     private static final int RETRY_INITIAL_TIMEOUT_MS = 20;
     private static final int RETRY_MAX_TIMEOUT_MS = 30_000;
 
+    private static final String UNSUCCESSFUL_TXN_CLEANUP_LOG_KEY = 
"Unsuccessful transaction cleanup after N attempts";
+
     private final IgniteThrottledLogger throttledLog;
 
     /** Placement driver helper. */
@@ -82,6 +86,8 @@ public class TxCleanupRequestSender {
     /** Local transaction state storage. */
     private final VolatileTxStateMetaStorage txStateVolatileStorage;
 
+    private final ClusterNodeResolver clusterNodeResolver;
+
     /** Executor that executes async cleanup actions. */
     private final ExecutorService cleanupExecutor;
 
@@ -96,19 +102,22 @@ public class TxCleanupRequestSender {
      * @param txStateVolatileStorage Volatile transaction state storage.
      * @param cleanupExecutor Cleanup executor.
      * @param commonScheduler Common scheduler.
+     * @param clusterNodeResolver Cluster node resolver.
      */
     public TxCleanupRequestSender(
             TxMessageSender txMessageSender,
             PlacementDriverHelper placementDriverHelper,
             VolatileTxStateMetaStorage txStateVolatileStorage,
             ExecutorService cleanupExecutor,
-            ScheduledExecutorService commonScheduler
+            ScheduledExecutorService commonScheduler,
+            ClusterNodeResolver clusterNodeResolver
     ) {
         this.txMessageSender = txMessageSender;
         this.placementDriverHelper = placementDriverHelper;
         this.txStateVolatileStorage = txStateVolatileStorage;
         this.cleanupExecutor = cleanupExecutor;
         this.retryExecutor = commonScheduler;
+        this.clusterNodeResolver = clusterNodeResolver;
         this.throttledLog = 
toThrottledLogger(Loggers.forClass(TxCleanupRequestSender.class), 
commonScheduler);
     }
 
@@ -392,6 +401,7 @@ public class TxCleanupRequestSender {
                         if 
(ReplicatorRecoverableExceptions.isRecoverable(throwable)) {
                             if (attemptsMade > ATTEMPTS_LOG_THRESHOLD) {
                                 throttledLog.warn(
+                                        UNSUCCESSFUL_TXN_CLEANUP_LOG_KEY,
                                         "Unsuccessful transaction cleanup 
after {} attempts, keep retrying [txId={}]",
                                         throwable,
                                         ATTEMPTS_LOG_THRESHOLD,
@@ -408,7 +418,12 @@ public class TxCleanupRequestSender {
                             // At the end of the day all write intents will be 
properly converted.
                             if (partitions == null) {
                                 // If we don't have any partition, which is 
the recovery or "unlock only" case,
-                                // just try again with the same node.
+                                // just try again with the same node, if it is 
online.
+                                InternalClusterNode n = 
clusterNodeResolver.getByConsistentId(node);
+                                if (n == null) {
+                                    return 
CompletableFutures.<Void>nullCompletedFuture();
+                                }
+
                                 return scheduleRetry(
                                         () -> sendCleanupMessageWithRetries(
                                                 commitPartitionId,
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 22824425b02..c19c80b3131 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -410,7 +410,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
                 placementDriverHelper,
                 txStateVolatileStorage,
                 writeIntentSwitchPool,
-                commonScheduler
+                commonScheduler,
+                topologyService
         );
 
         txMetrics = new TransactionMetricsSource(clockService);
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index 1732625a81e..7cbc2d99da7 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -148,7 +148,8 @@ public class TxCleanupTest extends IgniteAbstractTest {
                 placementDriverHelper,
                 mock(VolatileTxStateMetaStorage.class),
                 testSyncExecutorService(),
-                testSyncScheduledExecutorService()
+                testSyncScheduledExecutorService(),
+                topologyService
         );
     }
 

Reply via email to