This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c834f75671f IGNITE-26669 Tx state resolution improvements (#6807)
c834f75671f is described below

commit c834f75671f03930dbb1761f1b5c30e3daf322dc
Author: Egor <[email protected]>
AuthorDate: Tue Oct 21 16:45:06 2025 +0400

    IGNITE-26669 Tx state resolution improvements (#6807)
    
    Co-authored-by: Egor Kuts <[email protected]>
---
 .../replicator/TransactionStateResolver.java       |  18 +-
 .../replicator/TransactionStateResolverTest.java   | 246 +++++++++++++++++++++
 .../ignite/internal/table/TxAbstractTest.java      |   1 -
 .../ignite/internal/tx/impl/TxMessageSender.java   |   7 +-
 4 files changed, 262 insertions(+), 10 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
index e95a42b4aee..da3cc3dab03 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
@@ -24,6 +24,7 @@ import static org.apache.ignite.internal.tx.TxState.PENDING;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -33,6 +34,7 @@ import org.apache.ignite.internal.network.ClusterNodeResolver;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.RecipientLeftException;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
@@ -213,18 +215,22 @@ public class TransactionStateResolver {
         updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture);
 
         InternalClusterNode coordinator = coordinatorId == null ? null : 
clusterNodeResolver.getById(coordinatorId);
-
         if (coordinator == null) {
             // This means the coordinator node have either left the cluster or 
restarted.
             markAbandoned(txId);
 
             resolveTxStateFromCommitPartition(txId, commitGrpId, txMetaFuture);
         } else {
-            txMessageSender.resolveTxStateFromCoordinator(coordinator.name(), 
txId, timestamp)
+            txMessageSender.resolveTxStateFromCoordinator(coordinator, txId, 
timestamp)
                     .whenComplete((response, e) -> {
-                        if (e == null) {
-                            
txMetaFuture.complete(asTransactionMeta(response.txStateMeta()));
+                        if (e == null && response.txStateMeta() != null) {
+                            TransactionMetaMessage transactionMetaMessage = 
Objects.requireNonNull(response.txStateMeta(),
+                                    "Transaction state meta must not be null 
after check.");
+                            
txMetaFuture.complete(asTransactionMeta(transactionMetaMessage));
                         } else {
+                            if (e != null && e.getCause() instanceof 
RecipientLeftException) {
+                                markAbandoned(txId);
+                            }
                             resolveTxStateFromCommitPartition(txId, 
commitGrpId, txMetaFuture);
                         }
                     });
@@ -318,7 +324,7 @@ public class TransactionStateResolver {
         return transactionMeta == null ? null : 
transactionMeta.toTransactionMetaMessage(REPLICA_MESSAGES_FACTORY, 
TX_MESSAGES_FACTORY);
     }
 
-    private static @Nullable TransactionMeta asTransactionMeta(@Nullable 
TransactionMetaMessage transactionMetaMessage) {
-        return transactionMetaMessage == null ? null : 
transactionMetaMessage.asTransactionMeta();
+    private static TransactionMeta asTransactionMeta(TransactionMetaMessage 
transactionMetaMessage) {
+        return transactionMetaMessage.asTransactionMeta();
     }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolverTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolverTest.java
new file mode 100644
index 00000000000..1b15cc18b86
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolverTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.RecipientLeftException;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateResponse;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for {@link TransactionStateResolver}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TransactionStateResolverTest extends BaseIgniteAbstractTest {
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
+
+    private static final InternalClusterNode COORDINATOR_NODE =
+            new ClusterNodeImpl(UUID.randomUUID(), "coordinator", new 
NetworkAddress("127.0.0.1", 10800), null);
+
+    @Mock
+    private TxManager txManager;
+
+    @Mock
+    private ClusterNodeResolver clusterNodeResolver;
+
+    @Mock
+    private MessagingService messagingService;
+
+    @Mock
+    private PlacementDriver placementDriver;
+
+    @Mock
+    private TxMessageSender txMessageSender;
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    private final ClockService clockService = new TestClockService(clock);
+
+    private TransactionStateResolver resolver;
+
+    @BeforeEach
+    void setUp() {
+        resolver = new TransactionStateResolver(
+                txManager,
+                clockService,
+                clusterNodeResolver,
+                messagingService,
+                placementDriver,
+                txMessageSender
+        );
+
+        // Setup default mock for PlacementDriver to avoid timeouts.
+        lenient().when(placementDriver.awaitPrimaryReplica(any(), any(), 
anyLong(), any()))
+                .thenReturn(completedFuture(new TestReplicaMetaImpl(
+                        COORDINATOR_NODE,
+                        hybridTimestamp(1),
+                        HybridTimestamp.MAX_VALUE
+                )));
+    }
+
+    @Test
+    void testResolveTxStateFromCoordinatorWithNullResponse() {
+        UUID txId = UUID.randomUUID();
+        UUID coordinatorId = COORDINATOR_NODE.id();
+        TablePartitionId commitPartitionId = new TablePartitionId(1, 0);
+        HybridTimestamp timestamp = clock.now();
+
+        TxStateMeta pendingMeta = new TxStateMeta(TxState.PENDING, 
coordinatorId, commitPartitionId, null, null, null);
+
+        when(txManager.stateMeta(txId)).thenReturn(pendingMeta);
+        
when(clusterNodeResolver.getById(coordinatorId)).thenReturn(COORDINATOR_NODE);
+
+        // Mock TxStateResponse with null txStateMeta.
+        TxStateResponse response = TX_MESSAGES_FACTORY.txStateResponse()
+                .txStateMeta(null)
+                .timestamp(timestamp)
+                .build();
+
+        
when(txMessageSender.resolveTxStateFromCoordinator(any(InternalClusterNode.class),
 eq(txId), any(HybridTimestamp.class)))
+                .thenReturn(completedFuture(response));
+
+        // Mock the commit partition resolution to complete the future.
+        
when(txMessageSender.resolveTxStateFromCommitPartition(any(String.class), 
eq(txId), eq(commitPartitionId), any(Long.class)))
+                .thenReturn(completedFuture(pendingMeta));
+
+        CompletableFuture<TransactionMeta> result = resolver.resolveTxState(
+                txId,
+                commitPartitionId,
+                timestamp
+        );
+
+        assertThat(result, willCompleteSuccessfully());
+
+        // Verify that coordinator was called with InternalClusterNode.
+        ArgumentCaptor<InternalClusterNode> nodeCaptor = 
ArgumentCaptor.forClass(InternalClusterNode.class);
+        
verify(txMessageSender).resolveTxStateFromCoordinator(nodeCaptor.capture(), 
eq(txId), any(HybridTimestamp.class));
+        assertEquals(COORDINATOR_NODE, nodeCaptor.getValue());
+
+        // Verify fallback to commit partition was called.
+        
verify(txMessageSender).resolveTxStateFromCommitPartition(any(String.class), 
eq(txId), eq(commitPartitionId), any(Long.class));
+    }
+
+    @Test
+    void testResolveTxStateFromRestartedCoordinator() {
+        UUID txId = UUID.randomUUID();
+        UUID coordinatorId = COORDINATOR_NODE.id();
+        TablePartitionId commitPartitionId = new TablePartitionId(1, 0);
+        HybridTimestamp timestamp = clock.now();
+
+        TxStateMeta pendingMeta = new TxStateMeta(TxState.PENDING, 
coordinatorId, commitPartitionId, null, null, null);
+
+        when(txManager.stateMeta(txId)).thenReturn(pendingMeta);
+        
when(clusterNodeResolver.getById(coordinatorId)).thenReturn(COORDINATOR_NODE);
+
+        CompletableFuture<TxStateResponse> failedFuture = 
CompletableFuture.failedFuture(new RecipientLeftException());
+
+        
when(txMessageSender.resolveTxStateFromCoordinator(any(InternalClusterNode.class),
 eq(txId), any(HybridTimestamp.class)))
+                .thenReturn(failedFuture);
+        // Mock the commit partition resolution to complete the future.
+        TxStateMeta abandonedMeta = new TxStateMeta(TxState.ABANDONED, 
coordinatorId, commitPartitionId, null, null, null);
+        
when(txMessageSender.resolveTxStateFromCommitPartition(any(String.class), 
eq(txId), eq(commitPartitionId), any(Long.class)))
+                .thenReturn(completedFuture(abandonedMeta));
+
+        CompletableFuture<TransactionMeta> result = resolver.resolveTxState(
+                txId,
+                commitPartitionId,
+                timestamp
+        );
+
+        assertThat(result, willCompleteSuccessfully());
+        // Verify that transaction was marked as abandoned.
+        ArgumentCaptor<Function<TxStateMeta, TxStateMeta>> updaterCaptor = 
ArgumentCaptor.forClass(Function.class);
+        verify(txManager, atLeastOnce()).updateTxMeta(eq(txId), 
updaterCaptor.capture());
+        // Passed state updater should result in abandon state.
+        TxStateMeta afterUpdateState = 
updaterCaptor.getValue().apply(pendingMeta);
+        assertEquals(TxState.ABANDONED, afterUpdateState.txState());
+        // Verify fallback to commit partition was called.
+        
verify(txMessageSender).resolveTxStateFromCommitPartition(any(String.class), 
eq(txId), eq(commitPartitionId), any(Long.class));
+    }
+
+    @Test
+    void testResolveTxStateFromCoordinatorWithValidResponse() {
+        UUID txId = UUID.randomUUID();
+        UUID coordinatorId = COORDINATOR_NODE.id();
+        TablePartitionId commitPartitionId = new TablePartitionId(1, 0);
+        HybridTimestamp timestamp = clock.now();
+
+        TxStateMeta pendingMeta = new TxStateMeta(TxState.PENDING, 
coordinatorId, commitPartitionId, null, null, null);
+        TxStateMeta committedMeta = new TxStateMeta(TxState.COMMITTED, 
coordinatorId, commitPartitionId, timestamp, null, null);
+
+        when(txManager.stateMeta(txId)).thenReturn(pendingMeta);
+        
when(clusterNodeResolver.getById(coordinatorId)).thenReturn(COORDINATOR_NODE);
+
+        // Create a valid TransactionMetaMessage.
+        TransactionMetaMessage metaMessage = 
committedMeta.toTransactionMetaMessage(
+                new ReplicaMessagesFactory(),
+                TX_MESSAGES_FACTORY
+        );
+
+        TxStateResponse response = TX_MESSAGES_FACTORY.txStateResponse()
+                .txStateMeta(metaMessage)
+                .timestamp(timestamp)
+                .build();
+
+        
when(txMessageSender.resolveTxStateFromCoordinator(any(InternalClusterNode.class),
 eq(txId), any(HybridTimestamp.class)))
+                .thenReturn(completedFuture(response));
+
+        CompletableFuture<TransactionMeta> result = resolver.resolveTxState(
+                txId,
+                commitPartitionId,
+                timestamp
+        );
+
+        assertThat(result, willCompleteSuccessfully());
+
+        // Verify that coordinator was called with InternalClusterNode.
+        ArgumentCaptor<InternalClusterNode> nodeCaptor = 
ArgumentCaptor.forClass(InternalClusterNode.class);
+        
verify(txMessageSender).resolveTxStateFromCoordinator(nodeCaptor.capture(), 
eq(txId), any(HybridTimestamp.class));
+        assertEquals(COORDINATOR_NODE, nodeCaptor.getValue());
+
+        // Verify that commit partition fallback was NOT called.
+        verify(txMessageSender, never())
+                .resolveTxStateFromCommitPartition(any(String.class), 
eq(txId), eq(commitPartitionId), any(Long.class));
+
+        // Verify that the transaction meta was updated.
+        verify(txManager).updateTxMeta(eq(txId), any());
+    }
+}
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 265f3488b79..55c6281a3be 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -526,7 +526,6 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
      * Tests uncaught exception in the closure.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26669";)
     public void testTxClosureUncaughtExceptionAsync() {
         double balance = 10.;
         double delta = 50.;
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 7fb40c0f2d8..9d1c3945dcf 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -28,6 +28,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -207,18 +208,18 @@ public class TxMessageSender {
     /**
      * Send TxStateCoordinatorRequest.
      *
-     * @param primaryConsistentId Node id to send the request to.
+     * @param coordinatorClusterNode Node to send the request to.
      * @param txId Transaction id.
      * @param timestamp Timestamp to pass to target node.
      * @return Completable future of {@link TxStateResponse}.
      */
     public CompletableFuture<TxStateResponse> resolveTxStateFromCoordinator(
-            String primaryConsistentId,
+            InternalClusterNode coordinatorClusterNode,
             UUID txId,
             HybridTimestamp timestamp
     ) {
         return messagingService.invoke(
-                        primaryConsistentId,
+                        coordinatorClusterNode,
                         TX_MESSAGES_FACTORY.txStateCoordinatorRequest()
                                 .readTimestamp(timestamp)
                                 .txId(txId)

Reply via email to