This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5be587bb5fc IGNITE-28248 Fix txn cleanup retry sent on stale node
(#7843)
5be587bb5fc is described below
commit 5be587bb5fc03243b6f8f93815296d3cadeaa3d2
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Mar 26 21:18:49 2026 +0200
IGNITE-28248 Fix txn cleanup retry sent on stale node (#7843)
---
.../handlers/TxCleanupRecoveryRequestHandler.java | 7 ++++++-
.../internal/tx/impl/TransactionStateResolver.java | 6 +++---
.../internal/tx/impl/TxCleanupRequestSender.java | 19 +++++++++++++++++--
.../apache/ignite/internal/tx/impl/TxManagerImpl.java | 3 ++-
.../org/apache/ignite/internal/tx/TxCleanupTest.java | 3 ++-
5 files changed, 30 insertions(+), 8 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java
index 9a4900081be..9fd5ddeedbd 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.partition.replicator.handlers;
+import static org.apache.ignite.internal.logger.Loggers.toThrottledLogger;
import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
@@ -33,6 +34,7 @@ import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.IgniteThrottledLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.TxManager;
@@ -50,6 +52,8 @@ public class TxCleanupRecoveryRequestHandler {
private static final IgniteLogger LOG =
Loggers.forClass(TxCleanupRecoveryRequestHandler.class);
private static final int THROTTLE_BATCH_SIZE = 1000;
+ private final IgniteThrottledLogger throttledLog = toThrottledLogger(LOG);
+
private final TxStatePartitionStorage txStatePartitionStorage;
private final TxManager txManager;
private final FailureProcessor failureProcessor;
@@ -152,7 +156,8 @@ public class TxCleanupRecoveryRequestHandler {
txMeta.commitTimestamp(),
txId
).exceptionally(throwable -> {
- LOG.warn(
+ throttledLog.warn(
+ "Failed to cleanup transaction",
"Failed to cleanup transaction {}.",
throwable,
formatTxInfo(txId, txManager)
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
index 541582d5ca4..6e68def97c8 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionStateResolver.java
@@ -66,14 +66,14 @@ import org.jetbrains.annotations.Nullable;
public class TransactionStateResolver {
private static final IgniteLogger LOG =
Loggers.forClass(TransactionStateResolver.class);
- private final IgniteThrottledLogger throttledLogger;
-
/** Tx messages factory. */
private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
/** Replica messages factory. */
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+ private final IgniteThrottledLogger throttledLogger;
+
// TODO https://issues.apache.org/jira/browse/IGNITE-20408 after this
ticket this resolver will be no longer needed, as
// we will store coordinator as ClusterNode in local tx state map.
/** Function that resolves a node consistent ID to a cluster node. */
@@ -145,7 +145,7 @@ public class TransactionStateResolver {
.whenComplete((txStateMeta, e) -> {
if (e != null) {
Throwable cause = unwrapCause(e);
- throttledLogger.info(cause.getMessage());
+ throttledLogger.info("Failed to abort the
transaction on coordinator", cause.getMessage());
// Will cause fallback to commit partition
path.
txStateMeta =
TxStateMeta.builder(UNKNOWN).build();
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index b132d5851c9..fa17ba72e74 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -46,6 +46,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteThrottledLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.PartitionEnlistment;
@@ -69,6 +71,8 @@ public class TxCleanupRequestSender {
private static final int RETRY_INITIAL_TIMEOUT_MS = 20;
private static final int RETRY_MAX_TIMEOUT_MS = 30_000;
+ private static final String UNSUCCESSFUL_TXN_CLEANUP_LOG_KEY =
"Unsuccessful transaction cleanup after N attempts";
+
private final IgniteThrottledLogger throttledLog;
/** Placement driver helper. */
@@ -82,6 +86,8 @@ public class TxCleanupRequestSender {
/** Local transaction state storage. */
private final VolatileTxStateMetaStorage txStateVolatileStorage;
+ private final ClusterNodeResolver clusterNodeResolver;
+
/** Executor that executes async cleanup actions. */
private final ExecutorService cleanupExecutor;
@@ -96,19 +102,22 @@ public class TxCleanupRequestSender {
* @param txStateVolatileStorage Volatile transaction state storage.
* @param cleanupExecutor Cleanup executor.
* @param commonScheduler Common scheduler.
+ * @param clusterNodeResolver Cluster node resolver.
*/
public TxCleanupRequestSender(
TxMessageSender txMessageSender,
PlacementDriverHelper placementDriverHelper,
VolatileTxStateMetaStorage txStateVolatileStorage,
ExecutorService cleanupExecutor,
- ScheduledExecutorService commonScheduler
+ ScheduledExecutorService commonScheduler,
+ ClusterNodeResolver clusterNodeResolver
) {
this.txMessageSender = txMessageSender;
this.placementDriverHelper = placementDriverHelper;
this.txStateVolatileStorage = txStateVolatileStorage;
this.cleanupExecutor = cleanupExecutor;
this.retryExecutor = commonScheduler;
+ this.clusterNodeResolver = clusterNodeResolver;
this.throttledLog =
toThrottledLogger(Loggers.forClass(TxCleanupRequestSender.class),
commonScheduler);
}
@@ -392,6 +401,7 @@ public class TxCleanupRequestSender {
if
(ReplicatorRecoverableExceptions.isRecoverable(throwable)) {
if (attemptsMade > ATTEMPTS_LOG_THRESHOLD) {
throttledLog.warn(
+ UNSUCCESSFUL_TXN_CLEANUP_LOG_KEY,
"Unsuccessful transaction cleanup
after {} attempts, keep retrying [txId={}]",
throwable,
ATTEMPTS_LOG_THRESHOLD,
@@ -408,7 +418,12 @@ public class TxCleanupRequestSender {
// At the end of the day all write intents will be
properly converted.
if (partitions == null) {
// If we don't have any partition, which is
the recovery or "unlock only" case,
- // just try again with the same node.
+ // just try again with the same node, if it is
online.
+ InternalClusterNode n =
clusterNodeResolver.getByConsistentId(node);
+ if (n == null) {
+ return
CompletableFutures.<Void>nullCompletedFuture();
+ }
+
return scheduleRetry(
() -> sendCleanupMessageWithRetries(
commitPartitionId,
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 22824425b02..c19c80b3131 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -410,7 +410,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
placementDriverHelper,
txStateVolatileStorage,
writeIntentSwitchPool,
- commonScheduler
+ commonScheduler,
+ topologyService
);
txMetrics = new TransactionMetricsSource(clockService);
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
index 1732625a81e..7cbc2d99da7 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java
@@ -148,7 +148,8 @@ public class TxCleanupTest extends IgniteAbstractTest {
placementDriverHelper,
mock(VolatileTxStateMetaStorage.class),
testSyncExecutorService(),
- testSyncScheduledExecutorService()
+ testSyncScheduledExecutorService(),
+ topologyService
);
}