This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bc8e96cd144 IGNITE-28215 Fix transaction context loss in DirectTxUtils 
(#7814)
bc8e96cd144 is described below

commit bc8e96cd1447ed509a0069b7c73a097995df4cfe
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Mar 20 10:44:12 2026 +0100

    IGNITE-28215 Fix transaction context loss in DirectTxUtils (#7814)
    
    Use tx.channel() when possible and don't resolve by name to avoid channel 
mismatch.
---
 .../internal/client/tx/ClientLazyTransaction.java  |   4 +
 .../internal/client/tx/ClientTransaction.java      |  11 ++
 .../ignite/internal/client/tx/DirectTxUtils.java   |  37 +++--
 .../internal/client/tx/DirectTxUtilsTest.java      | 160 +++++++++++++++++++++
 .../app/client/ItThinClientTransactionsTest.java   |   3 +-
 5 files changed, 204 insertions(+), 11 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
index 1b2d70f3a21..70bf1765f9f 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
@@ -187,6 +187,10 @@ public class ClientLazyTransaction implements Transaction {
         var tx0 = tx;
 
         if (tx0 != null) {
+            if (tx0.isDone()) {
+                tx0.join().validateOwnership(ch);
+            }
+
             return new IgniteBiTuple<>(tx0, false);
         }
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index e910a47f1e7..eaab408ace6 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -710,4 +710,15 @@ public class ClientTransaction implements Transaction {
     public boolean killed() {
         return state.get() == STATE_KILLED;
     }
+
+    /**
+     * Validates transaction ownership.
+     *
+     * @param clientCh Client channel.
+     */
+    void validateOwnership(ReliableChannel clientCh) {
+        if (clientCh != reliableChannel) {
+            throw new IllegalArgumentException("Transaction belongs to a 
different client instance");
+        }
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
index 4932455ad5a..811a5689593 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
@@ -49,6 +49,8 @@ import org.jetbrains.annotations.Nullable;
  * Collection of helper methods to unify handling of direct transactions and 
piggybacking of tx start request.
  */
 public class DirectTxUtils {
+    private static final String TRANSACTION_CONTEXT_LOST = "Transaction 
context has been lost due to connection errors.";
+
     /**
      * Ensures that a client-side transaction is started and ready to serve 
requests.
      *
@@ -171,7 +173,7 @@ public class DirectTxUtils {
                 //noinspection resource
                 if (tx0.channel() != out.clientChannel()) {
                     // Do not throw IgniteClientConnectionException to avoid 
retry kicking in.
-                    throw new IgniteException(CONNECTION_ERR, "Transaction 
context has been lost due to connection errors.");
+                    throw new IgniteException(CONNECTION_ERR, 
TRANSACTION_CONTEXT_LOST);
                 }
 
                 out.out().packLong(tx0.id());
@@ -262,9 +264,11 @@ public class DirectTxUtils {
             @Nullable ClientTransaction tx,
             @Nullable PartitionMapping mapping
     ) {
-        CompletableFuture<ClientChannel> chFuture = ctx.firstReqFut != null
-                ? completedFuture(ctx.channel)
-                : ch.getChannelAsync(resolvePreferredNode(tx, mapping));
+        if (tx != null) {
+            tx.validateOwnership(ch);
+        }
+
+        CompletableFuture<ClientChannel> chFuture = resolveChannelInner(ctx, 
ch, tx, mapping);
 
         return chFuture.thenCompose(opCh -> {
             if (tx != null && tx.hasCommitPartition()
@@ -278,14 +282,29 @@ public class DirectTxUtils {
         });
     }
 
-    private static @Nullable String resolvePreferredNode(@Nullable 
ClientTransaction tx, @Nullable PartitionMapping pm) {
-        String opNode = pm == null ? null : pm.nodeConsistentId();
+    private static CompletableFuture<ClientChannel> resolveChannelInner(
+            WriteContext ctx,
+            ReliableChannel ch,
+            @Nullable ClientTransaction tx,
+            @Nullable PartitionMapping mapping) {
+        if (ctx.firstReqFut != null) {
+            return completedFuture(ctx.channel);
+        }
+
+        String opNode = mapping == null ? null : mapping.nodeConsistentId();
 
         if (tx != null) {
-            return !tx.isReadOnly() && tx.hasCommitPartition() && opNode != 
null ? opNode : tx.nodeName();
-        } else {
-            return opNode;
+            if (tx.isReadOnly() || !tx.hasCommitPartition() || opNode == null) 
{
+                if (tx.channel().closed()) {
+                    // Do not throw IgniteClientConnectionException to avoid 
retry kicking in.
+                    throw new IgniteException(CONNECTION_ERR, 
TRANSACTION_CONTEXT_LOST);
+                }
+
+                return completedFuture(tx.channel());
+            }
         }
+
+        return ch.getChannelAsync(opNode);
     }
 
     private static CompletableFuture<Void> enlistDirect(
diff --git 
a/modules/client/src/test/java/org/apache/ignite/internal/client/tx/DirectTxUtilsTest.java
 
b/modules/client/src/test/java/org/apache/ignite/internal/client/tx/DirectTxUtilsTest.java
new file mode 100644
index 00000000000..c6eb62c5bca
--- /dev/null
+++ 
b/modules/client/src/test/java/org/apache/ignite/internal/client/tx/DirectTxUtilsTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.tx;
+
+import static java.util.UUID.randomUUID;
+import static 
org.apache.ignite.internal.hlc.HybridTimestampTracker.EMPTY_TS_PROVIDER;
+import static 
org.apache.ignite.internal.hlc.HybridTimestampTracker.emptyTracker;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.ClientChannel;
+import org.apache.ignite.internal.client.ClientClusterNode;
+import org.apache.ignite.internal.client.ClientTransactionInflights;
+import org.apache.ignite.internal.client.PartitionMapping;
+import org.apache.ignite.internal.client.ProtocolContext;
+import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.WriteContext;
+import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.jspecify.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests for {@link DirectTxUtils}.
+ */
+@SuppressWarnings("resource")
+public class DirectTxUtilsTest extends BaseIgniteAbstractTest {
+    @Test
+    void resolveChannelReturnsTxChannelForReadOnlyTx() {
+        ReliableChannel ch = mock(ReliableChannel.class);
+        ClientChannel txChannel = mockClientChannel("node1");
+
+        WriteContext ctx = new WriteContext(emptyTracker(), 
ClientOp.TUPLE_GET);
+
+        // Read-only tx with mapping pointing to a different node.
+        ClientTransaction roTx = createTx(txChannel, ch, true, null);
+        PartitionMapping mapping = new PartitionMapping(1, "node2", 0);
+
+        CompletableFuture<ClientChannel> result = 
DirectTxUtils.resolveChannel(ctx, ch, false, roTx, mapping);
+
+        assertSame(txChannel, result.join());
+        verify(ch, never()).getChannelAsync(Mockito.any());
+    }
+
+    @Test
+    void resolveChannelReturnsTxChannelWhenNoCommitPartition() {
+        ReliableChannel ch = mock(ReliableChannel.class);
+        ClientChannel txChannel = mockClientChannel("node1");
+
+        // RW tx without commit partition.
+        ClientTransaction tx = createTx(txChannel, ch, false, null);
+
+        WriteContext ctx = new WriteContext(emptyTracker(), 
ClientOp.TUPLE_UPSERT);
+        PartitionMapping mapping = new PartitionMapping(1, "node2", 0);
+
+        CompletableFuture<ClientChannel> result = 
DirectTxUtils.resolveChannel(ctx, ch, true, tx, mapping);
+
+        assertSame(txChannel, result.join());
+        verify(ch, never()).getChannelAsync(Mockito.any());
+    }
+
+    @Test
+    void resolveChannelReturnsTxChannelWhenSameNode() {
+        ReliableChannel ch = mock(ReliableChannel.class);
+        ClientChannel txChannel = mockClientChannel("node1");
+
+        
when(ch.getChannelAsync("node1")).thenReturn(CompletableFuture.completedFuture(txChannel));
+
+        // RW tx with commit partition, mapping points to the same node as the 
tx coordinator.
+        PartitionMapping commitPm = new PartitionMapping(1, "node1", 0);
+        ClientTransaction tx = createTx(txChannel, ch, false, commitPm);
+
+        WriteContext ctx = new WriteContext(emptyTracker(), 
ClientOp.TUPLE_UPSERT);
+        PartitionMapping mapping = new PartitionMapping(2, "node1", 0);
+
+        CompletableFuture<ClientChannel> result = 
DirectTxUtils.resolveChannel(ctx, ch, true, tx, mapping);
+
+        assertSame(txChannel, result.join());
+    }
+
+    @Test
+    void resolveChannelUsesGetChannelAsyncForDifferentNode() {
+        ReliableChannel ch = mock(ReliableChannel.class);
+        when(ch.inflights()).thenReturn(new ClientTransactionInflights());
+
+        ClientChannel txChannel = mockClientChannel("node1");
+        ClientChannel otherChannel = mockClientChannel("node2");
+
+        
when(ch.getChannelAsync("node2")).thenReturn(CompletableFuture.completedFuture(otherChannel));
+
+        // RW tx with commit partition, mapping points to a different node.
+        PartitionMapping commitPm = new PartitionMapping(1, "node1", 0);
+        ClientTransaction tx = createTx(txChannel, ch, false, commitPm);
+
+        WriteContext ctx = new WriteContext(emptyTracker(), 
ClientOp.TUPLE_UPSERT);
+        PartitionMapping mapping = new PartitionMapping(2, "node2", 0);
+
+        CompletableFuture<ClientChannel> result = 
DirectTxUtils.resolveChannel(ctx, ch, true, tx, mapping);
+
+        assertSame(otherChannel, result.join());
+        verify(ch).getChannelAsync("node2");
+    }
+
+    @Test
+    void resolveChannelThrowsForDifferentReliableChannel() {
+        ReliableChannel ch1 = mock(ReliableChannel.class);
+        ReliableChannel ch2 = mock(ReliableChannel.class);
+        ClientChannel txChannel = mockClientChannel("node1");
+
+        ClientTransaction tx = createTx(txChannel, ch1, false, null);
+
+        WriteContext ctx = new WriteContext(emptyTracker(), 
ClientOp.TUPLE_UPSERT);
+
+        assertThrows(IllegalArgumentException.class, () -> 
DirectTxUtils.resolveChannel(ctx, ch2, true, tx, null));
+    }
+
+    @SuppressWarnings("DataFlowIssue")
+    private static ClientChannel mockClientChannel(String nodeName) {
+        ClientChannel channel = mock(ClientChannel.class);
+        ProtocolContext protocolContext = mock(ProtocolContext.class);
+        ClientClusterNode clusterNode = new ClientClusterNode(randomUUID(), 
nodeName, null);
+
+        when(channel.protocolContext()).thenReturn(protocolContext);
+        when(protocolContext.clusterNode()).thenReturn(clusterNode);
+
+        return channel;
+    }
+
+    private static ClientTransaction createTx(
+            ClientChannel channel,
+            ReliableChannel reliableChannel,
+            boolean readOnly,
+            @Nullable PartitionMapping commitPartitionMapping
+    ) {
+        return new ClientTransaction(
+                channel, reliableChannel, 1, readOnly, randomUUID(),
+                commitPartitionMapping, randomUUID(), EMPTY_TS_PROVIDER, 0);
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index ae2633c88c8..44209e6c62a 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -336,8 +336,7 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
             var ex = assertThrows(IgniteException.class, () -> 
recordView.upsert(tx, Tuple.create()));
 
-            assertThat(ex.getMessage(), containsString("Transaction context 
has been lost due to connection errors"));
-            assertEquals(ErrorGroups.Client.CONNECTION_ERR, ex.code());
+            assertThat(ex.getMessage(), containsString("Transaction belongs to 
a different client instance"));
         }
     }
 

Reply via email to