This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c834f75671f IGNITE-26669 Tx state resolution improvements (#6807)
c834f75671f is described below
commit c834f75671f03930dbb1761f1b5c30e3daf322dc
Author: Egor <[email protected]>
AuthorDate: Tue Oct 21 16:45:06 2025 +0400
IGNITE-26669 Tx state resolution improvements (#6807)
Co-authored-by: Egor Kuts <[email protected]>
---
.../replicator/TransactionStateResolver.java | 18 +-
.../replicator/TransactionStateResolverTest.java | 246 +++++++++++++++++++++
.../ignite/internal/table/TxAbstractTest.java | 1 -
.../ignite/internal/tx/impl/TxMessageSender.java | 7 +-
4 files changed, 262 insertions(+), 10 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
index e95a42b4aee..da3cc3dab03 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
@@ -24,6 +24,7 @@ import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,6 +34,7 @@ import org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
@@ -213,18 +215,22 @@ public class TransactionStateResolver {
updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture);
InternalClusterNode coordinator = coordinatorId == null ? null :
clusterNodeResolver.getById(coordinatorId);
-
if (coordinator == null) {
// This means the coordinator node have either left the cluster or
restarted.
markAbandoned(txId);
resolveTxStateFromCommitPartition(txId, commitGrpId, txMetaFuture);
} else {
- txMessageSender.resolveTxStateFromCoordinator(coordinator.name(),
txId, timestamp)
+ txMessageSender.resolveTxStateFromCoordinator(coordinator, txId,
timestamp)
.whenComplete((response, e) -> {
- if (e == null) {
-
txMetaFuture.complete(asTransactionMeta(response.txStateMeta()));
+ if (e == null && response.txStateMeta() != null) {
+ TransactionMetaMessage transactionMetaMessage =
Objects.requireNonNull(response.txStateMeta(),
+ "Transaction state meta must not be null
after check.");
+
txMetaFuture.complete(asTransactionMeta(transactionMetaMessage));
} else {
+ if (e != null && e.getCause() instanceof
RecipientLeftException) {
+ markAbandoned(txId);
+ }
resolveTxStateFromCommitPartition(txId,
commitGrpId, txMetaFuture);
}
});
@@ -318,7 +324,7 @@ public class TransactionStateResolver {
return transactionMeta == null ? null :
transactionMeta.toTransactionMetaMessage(REPLICA_MESSAGES_FACTORY,
TX_MESSAGES_FACTORY);
}
- private static @Nullable TransactionMeta asTransactionMeta(@Nullable
TransactionMetaMessage transactionMetaMessage) {
- return transactionMetaMessage == null ? null :
transactionMetaMessage.asTransactionMeta();
+ private static TransactionMeta asTransactionMeta(TransactionMetaMessage
transactionMetaMessage) {
+ return transactionMetaMessage.asTransactionMeta();
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolverTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolverTest.java
new file mode 100644
index 00000000000..1b15cc18b86
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolverTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.RecipientLeftException;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateResponse;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for {@link TransactionStateResolver}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TransactionStateResolverTest extends BaseIgniteAbstractTest {
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
+
+ private static final InternalClusterNode COORDINATOR_NODE =
+ new ClusterNodeImpl(UUID.randomUUID(), "coordinator", new
NetworkAddress("127.0.0.1", 10800), null);
+
+ @Mock
+ private TxManager txManager;
+
+ @Mock
+ private ClusterNodeResolver clusterNodeResolver;
+
+ @Mock
+ private MessagingService messagingService;
+
+ @Mock
+ private PlacementDriver placementDriver;
+
+ @Mock
+ private TxMessageSender txMessageSender;
+
+ private final HybridClock clock = new HybridClockImpl();
+
+ private final ClockService clockService = new TestClockService(clock);
+
+ private TransactionStateResolver resolver;
+
+ @BeforeEach
+ void setUp() {
+ resolver = new TransactionStateResolver(
+ txManager,
+ clockService,
+ clusterNodeResolver,
+ messagingService,
+ placementDriver,
+ txMessageSender
+ );
+
+ // Setup default mock for PlacementDriver to avoid timeouts.
+ lenient().when(placementDriver.awaitPrimaryReplica(any(), any(),
anyLong(), any()))
+ .thenReturn(completedFuture(new TestReplicaMetaImpl(
+ COORDINATOR_NODE,
+ hybridTimestamp(1),
+ HybridTimestamp.MAX_VALUE
+ )));
+ }
+
+ @Test
+ void testResolveTxStateFromCoordinatorWithNullResponse() {
+ UUID txId = UUID.randomUUID();
+ UUID coordinatorId = COORDINATOR_NODE.id();
+ TablePartitionId commitPartitionId = new TablePartitionId(1, 0);
+ HybridTimestamp timestamp = clock.now();
+
+ TxStateMeta pendingMeta = new TxStateMeta(TxState.PENDING,
coordinatorId, commitPartitionId, null, null, null);
+
+ when(txManager.stateMeta(txId)).thenReturn(pendingMeta);
+
when(clusterNodeResolver.getById(coordinatorId)).thenReturn(COORDINATOR_NODE);
+
+ // Mock TxStateResponse with null txStateMeta.
+ TxStateResponse response = TX_MESSAGES_FACTORY.txStateResponse()
+ .txStateMeta(null)
+ .timestamp(timestamp)
+ .build();
+
+
when(txMessageSender.resolveTxStateFromCoordinator(any(InternalClusterNode.class),
eq(txId), any(HybridTimestamp.class)))
+ .thenReturn(completedFuture(response));
+
+ // Mock the commit partition resolution to complete the future.
+
when(txMessageSender.resolveTxStateFromCommitPartition(any(String.class),
eq(txId), eq(commitPartitionId), any(Long.class)))
+ .thenReturn(completedFuture(pendingMeta));
+
+ CompletableFuture<TransactionMeta> result = resolver.resolveTxState(
+ txId,
+ commitPartitionId,
+ timestamp
+ );
+
+ assertThat(result, willCompleteSuccessfully());
+
+ // Verify that coordinator was called with InternalClusterNode.
+ ArgumentCaptor<InternalClusterNode> nodeCaptor =
ArgumentCaptor.forClass(InternalClusterNode.class);
+
verify(txMessageSender).resolveTxStateFromCoordinator(nodeCaptor.capture(),
eq(txId), any(HybridTimestamp.class));
+ assertEquals(COORDINATOR_NODE, nodeCaptor.getValue());
+
+ // Verify fallback to commit partition was called.
+
verify(txMessageSender).resolveTxStateFromCommitPartition(any(String.class),
eq(txId), eq(commitPartitionId), any(Long.class));
+ }
+
+ @Test
+ void testResolveTxStateFromRestartedCoordinator() {
+ UUID txId = UUID.randomUUID();
+ UUID coordinatorId = COORDINATOR_NODE.id();
+ TablePartitionId commitPartitionId = new TablePartitionId(1, 0);
+ HybridTimestamp timestamp = clock.now();
+
+ TxStateMeta pendingMeta = new TxStateMeta(TxState.PENDING,
coordinatorId, commitPartitionId, null, null, null);
+
+ when(txManager.stateMeta(txId)).thenReturn(pendingMeta);
+
when(clusterNodeResolver.getById(coordinatorId)).thenReturn(COORDINATOR_NODE);
+
+ CompletableFuture<TxStateResponse> failedFuture =
CompletableFuture.failedFuture(new RecipientLeftException());
+
+
when(txMessageSender.resolveTxStateFromCoordinator(any(InternalClusterNode.class),
eq(txId), any(HybridTimestamp.class)))
+ .thenReturn(failedFuture);
+ // Mock the commit partition resolution to complete the future.
+ TxStateMeta abandonedMeta = new TxStateMeta(TxState.ABANDONED,
coordinatorId, commitPartitionId, null, null, null);
+
when(txMessageSender.resolveTxStateFromCommitPartition(any(String.class),
eq(txId), eq(commitPartitionId), any(Long.class)))
+ .thenReturn(completedFuture(abandonedMeta));
+
+ CompletableFuture<TransactionMeta> result = resolver.resolveTxState(
+ txId,
+ commitPartitionId,
+ timestamp
+ );
+
+ assertThat(result, willCompleteSuccessfully());
+ // Verify that transaction was marked as abandoned.
+ ArgumentCaptor<Function<TxStateMeta, TxStateMeta>> updaterCaptor =
ArgumentCaptor.forClass(Function.class);
+ verify(txManager, atLeastOnce()).updateTxMeta(eq(txId),
updaterCaptor.capture());
+ // Passed state updater should result in abandon state.
+ TxStateMeta afterUpdateState =
updaterCaptor.getValue().apply(pendingMeta);
+ assertEquals(TxState.ABANDONED, afterUpdateState.txState());
+ // Verify fallback to commit partition was called.
+
verify(txMessageSender).resolveTxStateFromCommitPartition(any(String.class),
eq(txId), eq(commitPartitionId), any(Long.class));
+ }
+
+ @Test
+ void testResolveTxStateFromCoordinatorWithValidResponse() {
+ UUID txId = UUID.randomUUID();
+ UUID coordinatorId = COORDINATOR_NODE.id();
+ TablePartitionId commitPartitionId = new TablePartitionId(1, 0);
+ HybridTimestamp timestamp = clock.now();
+
+ TxStateMeta pendingMeta = new TxStateMeta(TxState.PENDING,
coordinatorId, commitPartitionId, null, null, null);
+ TxStateMeta committedMeta = new TxStateMeta(TxState.COMMITTED,
coordinatorId, commitPartitionId, timestamp, null, null);
+
+ when(txManager.stateMeta(txId)).thenReturn(pendingMeta);
+
when(clusterNodeResolver.getById(coordinatorId)).thenReturn(COORDINATOR_NODE);
+
+ // Create a valid TransactionMetaMessage.
+ TransactionMetaMessage metaMessage =
committedMeta.toTransactionMetaMessage(
+ new ReplicaMessagesFactory(),
+ TX_MESSAGES_FACTORY
+ );
+
+ TxStateResponse response = TX_MESSAGES_FACTORY.txStateResponse()
+ .txStateMeta(metaMessage)
+ .timestamp(timestamp)
+ .build();
+
+
when(txMessageSender.resolveTxStateFromCoordinator(any(InternalClusterNode.class),
eq(txId), any(HybridTimestamp.class)))
+ .thenReturn(completedFuture(response));
+
+ CompletableFuture<TransactionMeta> result = resolver.resolveTxState(
+ txId,
+ commitPartitionId,
+ timestamp
+ );
+
+ assertThat(result, willCompleteSuccessfully());
+
+ // Verify that coordinator was called with InternalClusterNode.
+ ArgumentCaptor<InternalClusterNode> nodeCaptor =
ArgumentCaptor.forClass(InternalClusterNode.class);
+
verify(txMessageSender).resolveTxStateFromCoordinator(nodeCaptor.capture(),
eq(txId), any(HybridTimestamp.class));
+ assertEquals(COORDINATOR_NODE, nodeCaptor.getValue());
+
+ // Verify that commit partition fallback was NOT called.
+ verify(txMessageSender, never())
+ .resolveTxStateFromCommitPartition(any(String.class),
eq(txId), eq(commitPartitionId), any(Long.class));
+
+ // Verify that the transaction meta was updated.
+ verify(txManager).updateTxMeta(eq(txId), any());
+ }
+}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 265f3488b79..55c6281a3be 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -526,7 +526,6 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
* Tests uncaught exception in the closure.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26669")
public void testTxClosureUncaughtExceptionAsync() {
double balance = 10.;
double delta = 50.;
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 7fb40c0f2d8..9d1c3945dcf 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -207,18 +208,18 @@ public class TxMessageSender {
/**
* Send TxStateCoordinatorRequest.
*
- * @param primaryConsistentId Node id to send the request to.
+ * @param coordinatorClusterNode Node to send the request to.
* @param txId Transaction id.
* @param timestamp Timestamp to pass to target node.
* @return Completable future of {@link TxStateResponse}.
*/
public CompletableFuture<TxStateResponse> resolveTxStateFromCoordinator(
- String primaryConsistentId,
+ InternalClusterNode coordinatorClusterNode,
UUID txId,
HybridTimestamp timestamp
) {
return messagingService.invoke(
- primaryConsistentId,
+ coordinatorClusterNode,
TX_MESSAGES_FACTORY.txStateCoordinatorRequest()
.readTimestamp(timestamp)
.txId(txId)