This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bc8e96cd144 IGNITE-28215 Fix transaction context loss in DirectTxUtils
(#7814)
bc8e96cd144 is described below
commit bc8e96cd1447ed509a0069b7c73a097995df4cfe
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Mar 20 10:44:12 2026 +0100
IGNITE-28215 Fix transaction context loss in DirectTxUtils (#7814)
Use tx.channel() when possible and don't resolve by name to avoid channel
mismatch.
---
.../internal/client/tx/ClientLazyTransaction.java | 4 +
.../internal/client/tx/ClientTransaction.java | 11 ++
.../ignite/internal/client/tx/DirectTxUtils.java | 37 +++--
.../internal/client/tx/DirectTxUtilsTest.java | 160 +++++++++++++++++++++
.../app/client/ItThinClientTransactionsTest.java | 3 +-
5 files changed, 204 insertions(+), 11 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
index 1b2d70f3a21..70bf1765f9f 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
@@ -187,6 +187,10 @@ public class ClientLazyTransaction implements Transaction {
var tx0 = tx;
if (tx0 != null) {
+ if (tx0.isDone()) {
+ tx0.join().validateOwnership(ch);
+ }
+
return new IgniteBiTuple<>(tx0, false);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index e910a47f1e7..eaab408ace6 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -710,4 +710,15 @@ public class ClientTransaction implements Transaction {
public boolean killed() {
return state.get() == STATE_KILLED;
}
+
+ /**
+ * Validates transaction ownership.
+ *
+ * @param clientCh Client channel.
+ */
+ void validateOwnership(ReliableChannel clientCh) {
+ if (clientCh != reliableChannel) {
+ throw new IllegalArgumentException("Transaction belongs to a
different client instance");
+ }
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
index 4932455ad5a..811a5689593 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
@@ -49,6 +49,8 @@ import org.jetbrains.annotations.Nullable;
* Collection of helper methods to unify handling of direct transactions and
piggybacking of tx start request.
*/
public class DirectTxUtils {
+ private static final String TRANSACTION_CONTEXT_LOST = "Transaction
context has been lost due to connection errors.";
+
/**
* Ensures that a client-side transaction is started and ready to serve
requests.
*
@@ -171,7 +173,7 @@ public class DirectTxUtils {
//noinspection resource
if (tx0.channel() != out.clientChannel()) {
// Do not throw IgniteClientConnectionException to avoid
retry kicking in.
- throw new IgniteException(CONNECTION_ERR, "Transaction
context has been lost due to connection errors.");
+ throw new IgniteException(CONNECTION_ERR,
TRANSACTION_CONTEXT_LOST);
}
out.out().packLong(tx0.id());
@@ -262,9 +264,11 @@ public class DirectTxUtils {
@Nullable ClientTransaction tx,
@Nullable PartitionMapping mapping
) {
- CompletableFuture<ClientChannel> chFuture = ctx.firstReqFut != null
- ? completedFuture(ctx.channel)
- : ch.getChannelAsync(resolvePreferredNode(tx, mapping));
+ if (tx != null) {
+ tx.validateOwnership(ch);
+ }
+
+ CompletableFuture<ClientChannel> chFuture = resolveChannelInner(ctx,
ch, tx, mapping);
return chFuture.thenCompose(opCh -> {
if (tx != null && tx.hasCommitPartition()
@@ -278,14 +282,29 @@ public class DirectTxUtils {
});
}
- private static @Nullable String resolvePreferredNode(@Nullable
ClientTransaction tx, @Nullable PartitionMapping pm) {
- String opNode = pm == null ? null : pm.nodeConsistentId();
+ private static CompletableFuture<ClientChannel> resolveChannelInner(
+ WriteContext ctx,
+ ReliableChannel ch,
+ @Nullable ClientTransaction tx,
+ @Nullable PartitionMapping mapping) {
+ if (ctx.firstReqFut != null) {
+ return completedFuture(ctx.channel);
+ }
+
+ String opNode = mapping == null ? null : mapping.nodeConsistentId();
if (tx != null) {
- return !tx.isReadOnly() && tx.hasCommitPartition() && opNode !=
null ? opNode : tx.nodeName();
- } else {
- return opNode;
+ if (tx.isReadOnly() || !tx.hasCommitPartition() || opNode == null)
{
+ if (tx.channel().closed()) {
+ // Do not throw IgniteClientConnectionException to avoid
retry kicking in.
+ throw new IgniteException(CONNECTION_ERR,
TRANSACTION_CONTEXT_LOST);
+ }
+
+ return completedFuture(tx.channel());
+ }
}
+
+ return ch.getChannelAsync(opNode);
}
private static CompletableFuture<Void> enlistDirect(
diff --git
a/modules/client/src/test/java/org/apache/ignite/internal/client/tx/DirectTxUtilsTest.java
b/modules/client/src/test/java/org/apache/ignite/internal/client/tx/DirectTxUtilsTest.java
new file mode 100644
index 00000000000..c6eb62c5bca
--- /dev/null
+++
b/modules/client/src/test/java/org/apache/ignite/internal/client/tx/DirectTxUtilsTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.tx;
+
+import static java.util.UUID.randomUUID;
+import static
org.apache.ignite.internal.hlc.HybridTimestampTracker.EMPTY_TS_PROVIDER;
+import static
org.apache.ignite.internal.hlc.HybridTimestampTracker.emptyTracker;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.ClientChannel;
+import org.apache.ignite.internal.client.ClientClusterNode;
+import org.apache.ignite.internal.client.ClientTransactionInflights;
+import org.apache.ignite.internal.client.PartitionMapping;
+import org.apache.ignite.internal.client.ProtocolContext;
+import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.WriteContext;
+import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.jspecify.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests for {@link DirectTxUtils}.
+ */
+@SuppressWarnings("resource")
+public class DirectTxUtilsTest extends BaseIgniteAbstractTest {
+ @Test
+ void resolveChannelReturnsTxChannelForReadOnlyTx() {
+ ReliableChannel ch = mock(ReliableChannel.class);
+ ClientChannel txChannel = mockClientChannel("node1");
+
+ WriteContext ctx = new WriteContext(emptyTracker(),
ClientOp.TUPLE_GET);
+
+ // Read-only tx with mapping pointing to a different node.
+ ClientTransaction roTx = createTx(txChannel, ch, true, null);
+ PartitionMapping mapping = new PartitionMapping(1, "node2", 0);
+
+ CompletableFuture<ClientChannel> result =
DirectTxUtils.resolveChannel(ctx, ch, false, roTx, mapping);
+
+ assertSame(txChannel, result.join());
+ verify(ch, never()).getChannelAsync(Mockito.any());
+ }
+
+ @Test
+ void resolveChannelReturnsTxChannelWhenNoCommitPartition() {
+ ReliableChannel ch = mock(ReliableChannel.class);
+ ClientChannel txChannel = mockClientChannel("node1");
+
+ // RW tx without commit partition.
+ ClientTransaction tx = createTx(txChannel, ch, false, null);
+
+ WriteContext ctx = new WriteContext(emptyTracker(),
ClientOp.TUPLE_UPSERT);
+ PartitionMapping mapping = new PartitionMapping(1, "node2", 0);
+
+ CompletableFuture<ClientChannel> result =
DirectTxUtils.resolveChannel(ctx, ch, true, tx, mapping);
+
+ assertSame(txChannel, result.join());
+ verify(ch, never()).getChannelAsync(Mockito.any());
+ }
+
+ @Test
+ void resolveChannelReturnsTxChannelWhenSameNode() {
+ ReliableChannel ch = mock(ReliableChannel.class);
+ ClientChannel txChannel = mockClientChannel("node1");
+
+
when(ch.getChannelAsync("node1")).thenReturn(CompletableFuture.completedFuture(txChannel));
+
+ // RW tx with commit partition, mapping points to the same node as the
tx coordinator.
+ PartitionMapping commitPm = new PartitionMapping(1, "node1", 0);
+ ClientTransaction tx = createTx(txChannel, ch, false, commitPm);
+
+ WriteContext ctx = new WriteContext(emptyTracker(),
ClientOp.TUPLE_UPSERT);
+ PartitionMapping mapping = new PartitionMapping(2, "node1", 0);
+
+ CompletableFuture<ClientChannel> result =
DirectTxUtils.resolveChannel(ctx, ch, true, tx, mapping);
+
+ assertSame(txChannel, result.join());
+ }
+
+ @Test
+ void resolveChannelUsesGetChannelAsyncForDifferentNode() {
+ ReliableChannel ch = mock(ReliableChannel.class);
+ when(ch.inflights()).thenReturn(new ClientTransactionInflights());
+
+ ClientChannel txChannel = mockClientChannel("node1");
+ ClientChannel otherChannel = mockClientChannel("node2");
+
+
when(ch.getChannelAsync("node2")).thenReturn(CompletableFuture.completedFuture(otherChannel));
+
+ // RW tx with commit partition, mapping points to a different node.
+ PartitionMapping commitPm = new PartitionMapping(1, "node1", 0);
+ ClientTransaction tx = createTx(txChannel, ch, false, commitPm);
+
+ WriteContext ctx = new WriteContext(emptyTracker(),
ClientOp.TUPLE_UPSERT);
+ PartitionMapping mapping = new PartitionMapping(2, "node2", 0);
+
+ CompletableFuture<ClientChannel> result =
DirectTxUtils.resolveChannel(ctx, ch, true, tx, mapping);
+
+ assertSame(otherChannel, result.join());
+ verify(ch).getChannelAsync("node2");
+ }
+
+ @Test
+ void resolveChannelThrowsForDifferentReliableChannel() {
+ ReliableChannel ch1 = mock(ReliableChannel.class);
+ ReliableChannel ch2 = mock(ReliableChannel.class);
+ ClientChannel txChannel = mockClientChannel("node1");
+
+ ClientTransaction tx = createTx(txChannel, ch1, false, null);
+
+ WriteContext ctx = new WriteContext(emptyTracker(),
ClientOp.TUPLE_UPSERT);
+
+ assertThrows(IllegalArgumentException.class, () ->
DirectTxUtils.resolveChannel(ctx, ch2, true, tx, null));
+ }
+
+ @SuppressWarnings("DataFlowIssue")
+ private static ClientChannel mockClientChannel(String nodeName) {
+ ClientChannel channel = mock(ClientChannel.class);
+ ProtocolContext protocolContext = mock(ProtocolContext.class);
+ ClientClusterNode clusterNode = new ClientClusterNode(randomUUID(),
nodeName, null);
+
+ when(channel.protocolContext()).thenReturn(protocolContext);
+ when(protocolContext.clusterNode()).thenReturn(clusterNode);
+
+ return channel;
+ }
+
+ private static ClientTransaction createTx(
+ ClientChannel channel,
+ ReliableChannel reliableChannel,
+ boolean readOnly,
+ @Nullable PartitionMapping commitPartitionMapping
+ ) {
+ return new ClientTransaction(
+ channel, reliableChannel, 1, readOnly, randomUUID(),
+ commitPartitionMapping, randomUUID(), EMPTY_TS_PROVIDER, 0);
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index ae2633c88c8..44209e6c62a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -336,8 +336,7 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
var ex = assertThrows(IgniteException.class, () ->
recordView.upsert(tx, Tuple.create()));
- assertThat(ex.getMessage(), containsString("Transaction context
has been lost due to connection errors"));
- assertEquals(ErrorGroups.Client.CONNECTION_ERR, ex.code());
+ assertThat(ex.getMessage(), containsString("Transaction belongs to
a different client instance"));
}
}