This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ce004dc6c3 IGNITE-27617 Ensure directly mapped client transaction 
write intents are cleaned up
6ce004dc6c3 is described below

commit 6ce004dc6c375511ec06c1d0c3b573ca3f1f7f6e
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Wed Jan 21 16:16:49 2026 +0300

    IGNITE-27617 Ensure directly mapped client transaction write intents are 
cleaned up
---
 .../client/proto/ProtocolBitmaskFeature.java       |   7 +-
 .../ignite/client/handler/ItClientHandlerTest.java |   1 +
 .../ignite/client/handler/ClientHandlerModule.java |   3 +-
 .../handler/ClientInboundMessageHandler.java       |   5 +-
 .../tx/ClientTransactionCommitRequest.java         |  10 +-
 .../tx/ClientTransactionRollbackRequest.java       |  14 +-
 .../client/ClientTransactionInflights.java         |  10 +
 .../ignite/internal/client/TcpClientChannel.java   |   3 +-
 .../internal/client/tx/ClientTransaction.java      |   3 +
 .../apache/ignite/client/fakes/FakeTxManager.java  |   1 +
 .../partition/replicator/TxRecoveryEngine.java     |   1 +
 .../internal/client/ItClientDirectMappingTest.java | 231 ++++++++++++++++-----
 .../ignite/internal/table/ItColocationTest.java    |   1 +
 .../replication/PartitionReplicaListenerTest.java  |   2 +-
 .../ZonePartitionReplicaListenerTest.java          |   6 +-
 .../distributed/storage/InternalTableImplTest.java |   4 +-
 .../tx/distributed/ItTransactionRecoveryTest.java  |   1 +
 .../org/apache/ignite/internal/tx/TxManager.java   |   2 +
 .../internal/tx/impl/ReadWriteTransactionImpl.java |  15 ++
 .../ignite/internal/tx/impl/TxManagerImpl.java     |   3 +-
 .../tx/impl/ReadWriteTransactionImplTest.java      |   9 +-
 21 files changed, 257 insertions(+), 75 deletions(-)

diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
index 8a4caa9a0ef..3e9e2417434 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
@@ -97,7 +97,12 @@ public enum ProtocolBitmaskFeature {
     /**
      * Compute tasks and jobs accept observable timestamp from the client.
      */
-    COMPUTE_OBSERVABLE_TS(14);
+    COMPUTE_OBSERVABLE_TS(14),
+
+    /**
+     * Send remote writes flag for directly mapped transactions.
+     */
+    TX_DIRECT_MAPPING_SEND_REMOTE_WRITES(15);
 
     private static final EnumSet<ProtocolBitmaskFeature> 
ALL_FEATURES_AS_ENUM_SET =
             EnumSet.allOf(ProtocolBitmaskFeature.class);
diff --git 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index b8ad0f0e118..cc5d65f275b 100644
--- 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -560,6 +560,7 @@ public class ItClientHandlerTest extends 
BaseIgniteAbstractTest {
             expected.set(12);
             expected.set(13);
             expected.set(14);
+            expected.set(15);
             assertEquals(expected, supportedFeatures);
 
             var extensionsLen = unpacker.unpackInt();
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 9389c76898a..0bab18395fe 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -98,7 +98,8 @@ public class ClientHandlerModule implements IgniteComponent, 
PlatformComputeTran
             ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING,
             ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS,
             ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
-            ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS
+            ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS,
+            ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES
     ));
 
     /** Connection id generator.
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index a083b696403..8b89bbfb58a 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -25,6 +25,7 @@ import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -949,11 +950,11 @@ public class ClientInboundMessageHandler
 
             case ClientOp.TX_COMMIT:
                 return ClientTransactionCommitRequest.process(in, resources, 
metrics, clockService, igniteTables,
-                        clientContext.hasFeature(TX_PIGGYBACK), tsTracker);
+                        clientContext.hasFeature(TX_PIGGYBACK), 
clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES), tsTracker);
 
             case ClientOp.TX_ROLLBACK:
                 return ClientTransactionRollbackRequest.process(in, resources, 
metrics, igniteTables,
-                        clientContext.hasFeature(TX_PIGGYBACK));
+                        clientContext.hasFeature(TX_PIGGYBACK), 
clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES));
 
             case ClientOp.COMPUTE_EXECUTE:
                 return ClientComputeExecuteRequest.process(in, compute, 
clusterService, notificationSender(requestId), clientContext);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
index 161281673eb..88896e09cc5 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.tx.TransactionException;
 
@@ -52,6 +53,7 @@ public class ClientTransactionCommitRequest {
      * @param clockService Clock service.
      * @param igniteTables Tables.
      * @param enableDirectMapping Enable direct mapping flag.
+     * @param sendRemoteWritesFlag Send remote writes flag.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -61,6 +63,7 @@ public class ClientTransactionCommitRequest {
             ClockService clockService,
             IgniteTablesInternal igniteTables,
             boolean enableDirectMapping,
+            boolean sendRemoteWritesFlag,
             HybridTimestampTracker tsTracker
     ) throws IgniteInternalCheckedException {
         long resourceId = in.unpackLong();
@@ -83,8 +86,13 @@ public class ClientTransactionCommitRequest {
             if (cnt > 0) {
                 long causality = in.unpackLong();
 
-                // Update causality.
+                // Update causality. Used to assign commit timestamp after all 
enlistments.
                 
clockService.updateClock(HybridTimestamp.hybridTimestamp(causality));
+
+                ReadWriteTransactionImpl tx0 = (ReadWriteTransactionImpl) tx;
+
+                // Enforce cleanup.
+                tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean());
             }
 
             Exception ex = null;
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
index 1ea53aad501..c6492dfa99a 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
@@ -28,6 +28,7 @@ import 
org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
 
 /**
  * Client transaction rollback request.
@@ -41,6 +42,7 @@ public class ClientTransactionRollbackRequest {
      * @param metrics Metrics.
      * @param igniteTables Tables facade.
      * @param enableDirectMapping Enable direct mapping.
+     * @param sendRemoteWritesFlag Send remote writes flag.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -48,7 +50,8 @@ public class ClientTransactionRollbackRequest {
             ClientResourceRegistry resources,
             ClientHandlerMetricSource metrics,
             IgniteTablesInternal igniteTables,
-            boolean enableDirectMapping
+            boolean enableDirectMapping,
+            boolean sendRemoteWritesFlag
     )
             throws IgniteInternalCheckedException {
         long resourceId = in.unpackLong();
@@ -70,6 +73,15 @@ public class ClientTransactionRollbackRequest {
                     merge(table.internalTable(), partId, consistentId, token, 
tx, false);
                 }
             }
+
+            if (cnt > 0) {
+                in.unpackLong(); // Unpack causality.
+
+                ReadWriteTransactionImpl tx0 = (ReadWriteTransactionImpl) tx;
+
+                // Enforce cleanup.
+                tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean());
+            }
         }
 
         return tx.rollbackAsync().handle((res, err) -> {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
index 74b5512cd90..fbfeb1d9611 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
@@ -125,6 +125,16 @@ public class ClientTransactionInflights {
         txCtxMap.remove(uuid);
     }
 
+    /**
+     * Check if the inflights map contains a given transaction.
+     *
+     * @param txId Tx id.
+     * @return {@code True} if contains.
+     */
+    public boolean contains(UUID txId) {
+        return txCtxMap.containsKey(txId);
+    }
+
     /**
      * Transaction inflights context.
      */
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index b8ce92e43c5..b441fa19a7e 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -99,7 +99,8 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING,
             ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS,
             ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
-            ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS
+            ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS,
+            ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES
     ));
 
     /** Minimum supported heartbeat interval. */
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index 74a7d8c3ac4..d1c31601921 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -314,6 +314,9 @@ public class ClientTransaction implements Transaction {
 
         if (cnt > 0) {
             w.out().packLong(tracker.get().longValue());
+
+            // Send information about directly mapped writes to ensure a 
proper cleanup algorithm is chosen.
+            w.out().packBoolean(!ch.inflights().contains(txId));
         }
     }
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index d12b1de5e91..2f0b4851375 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -234,6 +234,7 @@ public class FakeTxManager implements TxManager {
             boolean commitIntent,
             boolean timeoutExceeded,
             boolean recovery,
+            boolean noRemoteWrites,
             Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedGroups,
             UUID txId
     ) {
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
index b2d452f0794..87706ef4cc1 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
@@ -70,6 +70,7 @@ public class TxRecoveryEngine {
                         false,
                         false,
                         true,
+                        false,
                         Map.of(replicationGroupId, 
abandonedTxRecoveryEnlistmentFactory.apply(clusterNodeResolver.getById(senderId))),
                         txId
                 )
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
index b671b25ca24..ad39ceb7677 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
@@ -19,26 +19,54 @@ package org.apache.ignite.internal.client;
 
 import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
-import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.withTx;
-import static 
org.apache.ignite.internal.tx.test.ItTransactionTestUtils.withTxVoid;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import org.apache.ignite.InitParametersBuilder;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.ignite.client.IgniteClient;
-import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
-import org.apache.ignite.internal.client.table.ClientTable;
+import org.apache.ignite.client.IgniteClient.Builder;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionDistribution;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Test direct mapping for client's transactions.
  */
-public class ItClientDirectMappingTest extends ClusterPerTestIntegrationTest {
+public class ItClientDirectMappingTest extends ClusterPerClassIntegrationTest {
     /** Table name. */
     private static final String TABLE_NAME = "test_table";
 
+    private static final String ZONE_NAME = "test_zone";
+
+    /** Table name 2. */
+    private static final String TABLE_NAME_2 = "test_table_2";
+
+    private static final String ZONE_NAME2 = "test_zone2";
+
+    protected static final String COLUMN_KEY = "key";
+
+    protected static final String COLUMN_VAL = "val";
+
+    protected static final int PARTITIONS = 10;
+
     /** Nodes bootstrap configuration pattern. */
     private static final String NODE_BOOTSTRAP_CFG_TEMPLATE = "ignite {\n"
             + "  network: {\n"
@@ -53,66 +81,116 @@ public class ItClientDirectMappingTest extends 
ClusterPerTestIntegrationTest {
             + "  failureHandler.dumpThreadsOnFailure: false\n"
             + "}";
 
-    @BeforeEach
-    public void setup() throws Exception {
-        String zoneSql = "create zone test_zone with partitions=5, replicas=1, 
storage_profiles='" + DEFAULT_AIPERSIST_PROFILE_NAME + "'";
-        String sql = "create table " + TABLE_NAME + " (key int primary key, 
val varchar(20)) zone TEST_ZONE";
+    @BeforeAll
+    public static void setup() throws Exception {
+        String zoneSql = "create zone " + ZONE_NAME + " with partitions=" + 
PARTITIONS + ", replicas=2, storage_profiles='"
+                + DEFAULT_AIPERSIST_PROFILE_NAME + "'";
+        String sql = "create table " + TABLE_NAME + " (key int primary key, 
val varchar(20)) zone " + ZONE_NAME;
+        String zoneSql2 = "create zone " + ZONE_NAME2 + " with partitions=" + 
PARTITIONS + ", replicas=2, storage_profiles='"
+                + DEFAULT_AIPERSIST_PROFILE_NAME + "'";
+        String sql2 = "create table " + TABLE_NAME_2 + " (key int primary key, 
val varchar(20)) zone " + ZONE_NAME2;
 
-        cluster.doInSession(0, session -> {
+        CLUSTER.doInSession(0, session -> {
             executeUpdate(zoneSql, session);
             executeUpdate(sql, session);
+            executeUpdate(zoneSql2, session);
+            executeUpdate(sql2, session);
         });
     }
 
-    @Override
-    protected void customizeInitParameters(InitParametersBuilder builder) {
-        super.customizeInitParameters(builder);
-
-        builder.clusterConfiguration("ignite {"
-                + "  transaction: {"
-                + "      readOnlyTimeoutMillis: 30000,"
-                + "      readWriteTimeoutMillis: 30000"
-                + "  },"
-                + "  replication: {"
-                + "      rpcTimeoutMillis: 30000"
-                + "  },"
-                + "}");
-    }
-
-    @Test
-    public void testBasicImplicit() {
-        try (IgniteClient client0 = clientConnectedToNode(0)) {
-            ClientTable table = (ClientTable) 
client0.tables().table(TABLE_NAME);
-            KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
-
-            Tuple key = Tuple.create().set("key", 0);
-            Tuple val = Tuple.create().set("val", "test0");
+    @ParameterizedTest
+    @ValueSource(ints = {0, 1, 2})
+    public void testReadOnCoordinatorWithDirectWrite(int testMode) {
+        TestMode mode = TestMode.values()[testMode];
+        boolean commit = mode == TestMode.COMMIT || mode == TestMode.COMPAT;
+        if (mode == TestMode.COMPAT) {
+            BitSet features = IgniteTestUtils.getFieldValue(null, 
TcpClientChannel.class, "SUPPORTED_FEATURES");
+            
features.clear(ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES.featureId());
+        }
 
-            kvView.put(null, key, val);
-            Tuple val0 = kvView.get(null, key);
-            assertTrue(Tuple.equals(val, val0));
+        try (IgniteClient client = clientConnectedToAllNodes()) {
+            Table table1 = client.tables().table(TABLE_NAME);
+            KeyValueView<Tuple, Tuple> view1 = table1.keyValueView();
+            Table table2 = client.tables().table(TABLE_NAME_2);
+            KeyValueView<Tuple, Tuple> view2 = table2.keyValueView();
+
+            Map<Partition, ClusterNode> map1 = 
table1.partitionDistribution().primaryReplicasAsync().join();
+            Map<Integer, ClusterNode> mapPartById1 = 
map1.entrySet().stream().collect(Collectors.toMap(
+                    entry -> Math.toIntExact(entry.getKey().id()),
+                    Entry::getValue
+            ));
+
+            Map<Partition, ClusterNode> map2 = 
table2.partitionDistribution().primaryReplicasAsync().join();
+            Map<Integer, ClusterNode> mapPartById2 = 
map2.entrySet().stream().collect(Collectors.toMap(
+                    entry -> Math.toIntExact(entry.getKey().id()),
+                    Entry::getValue
+            ));
+
+            // Find a partition which mapped to different primaries.
+            int targetPart = -1;
+
+            for (int i = 0; i < PARTITIONS; i++) {
+                ClusterNode node1 = mapPartById1.get(i);
+                ClusterNode node2 = mapPartById2.get(i);
+
+                if (!node1.equals(node2)) {
+                    targetPart = i;
+                    break;
+                }
+            }
+
+            if (targetPart == -1) {
+                log.warn("Skipping test due to bad assignment");
+                return;
+            }
+
+            log.info("DBG: using partition " + targetPart);
+
+            // Tables have the same structure, can reuse keys.
+            List<Tuple> keys = generateKeysForPartition(commit ? 0 : 100, 1, 
map1, targetPart, table1);
+
+            ClientLazyTransaction tx = (ClientLazyTransaction) 
client.transactions().begin(new TransactionOptions().readOnly(false));
+
+            Tuple key = keys.get(0);
+            // Enlist read operation on coordinator (proxy mode).
+            if (view2.get(tx, key) != null) {
+                fail("Should never happen");
+
+                return;
+            }
+
+            // Enlist write operation on other node (direct mode).
+            view1.put(tx, key, val("test" + key.intValue(0)));
+            if (commit) {
+                tx.commit();
+            } else {
+                tx.rollback();
+            }
+
+            if (commit) {
+                assertNotNull(view1.get(null, key), "key=" + key);
+            } else {
+                assertNull(view1.get(null, key), "key=" + key);
+            }
+        } finally {
+            if (mode == TestMode.COMPAT) {
+                BitSet features = IgniteTestUtils.getFieldValue(null, 
TcpClientChannel.class, "SUPPORTED_FEATURES");
+                
features.set(ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES.featureId());
+            }
         }
     }
 
-    @Test
-    public void testBasicExplicit() {
-        try (IgniteClient client0 = clientConnectedToNode(0)) {
-            ClientTable table = (ClientTable) 
client0.tables().table(TABLE_NAME);
-            KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
-
-            Tuple key = Tuple.create().set("key", 0);
-            Tuple val = Tuple.create().set("val", "test0");
+    private IgniteClient clientConnectedToAllNodes() {
+        Builder builder = IgniteClient.builder();
+        String[] addresses = new String[initialNodes()];
 
-            withTxVoid(client0.transactions(), tx -> kvView.put(tx, key, val));
-            Tuple val0 = withTx(client0.transactions(), tx -> kvView.get(tx, 
key));
-            assertTrue(Tuple.equals(val, val0));
+        for (int i = 0; i < initialNodes(); i++) {
+            addresses[i] = "localhost:" + igniteImpl(i).clientAddress().port();
         }
-    }
 
-    private IgniteClient clientConnectedToNode(int nodeIndex) {
-        return IgniteClient.builder()
-                .addresses("localhost:" + 
igniteImpl(nodeIndex).clientAddress().port())
-                .build();
+        builder.addresses(addresses);
+
+        return builder.build();
     }
 
     /**
@@ -129,4 +207,43 @@ public class ItClientDirectMappingTest extends 
ClusterPerTestIntegrationTest {
     protected int initialNodes() {
         return 2;
     }
+
+    private static List<Tuple> generateKeysForPartition(
+            int start,
+            int count,
+            Map<Partition, ClusterNode> map,
+            int partId,
+            Table table
+    ) {
+        List<Tuple> keys = new ArrayList<>();
+        PartitionDistribution partitionManager = table.partitionDistribution();
+
+        int k = start;
+        while (keys.size() != count) {
+            k++;
+            Tuple t = key(k);
+
+            Partition part = partitionManager.partitionAsync(t).orTimeout(5, 
TimeUnit.SECONDS).join();
+
+            if (part.id() == partId) {
+                keys.add(t);
+            }
+        }
+
+        return keys;
+    }
+
+    private static Tuple val(String v) {
+        return Tuple.create().set(COLUMN_VAL, v);
+    }
+
+    private static Tuple key(Integer k) {
+        return Tuple.create().set(COLUMN_KEY, k);
+    }
+
+    enum TestMode {
+        COMMIT,
+        ROLLBACK,
+        COMPAT
+    }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index accc9b66b48..fd8bbc1477d 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -229,6 +229,7 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                     boolean commitIntent,
                     boolean timeoutExceeded,
                     boolean recovery,
+                    boolean noRemoteWrites,
                     Map<ZonePartitionId, PendingTxPartitionEnlistment> 
enlistedGroups,
                     UUID txId
             ) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index dee8e3fee0e..8fc68517c4c 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -2030,7 +2030,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         doAnswer(invocation -> 
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
 
         doAnswer(invocation -> nullCompletedFuture())
-                .when(txManager).finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), any(), any());
+                .when(txManager).finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), anyBoolean(), any(), any());
         doAnswer(invocation -> nullCompletedFuture())
                 .when(txManager).cleanup(any(), anyString(), any());
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index 0ec3a7bfa22..ecffcacf052 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -967,7 +967,7 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
         doAnswer(invocation -> 
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
 
         doAnswer(invocation -> nullCompletedFuture())
-                .when(txManager).finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), any(), any());
+                .when(txManager).finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), anyBoolean(), any(), any());
         doAnswer(invocation -> nullCompletedFuture())
                 .when(txManager).cleanup(any(), anyString(), any());
     }
@@ -1345,7 +1345,7 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
     @Test
     public void testTxStateReplicaRequestEmptyState() throws Exception {
         doAnswer(invocation -> {
-            UUID txId = invocation.getArgument(6);
+            UUID txId = invocation.getArgument(7);
 
             txManager.updateTxMeta(txId, old -> TxStateMeta.builder(ABORTED)
                     .txCoordinatorId(localNode.id())
@@ -1354,7 +1354,7 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
             );
 
             return nullCompletedFuture();
-        }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), 
anyBoolean(), any(), any());
+        }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), 
anyBoolean(), anyBoolean(), any(), any());
 
         CompletableFuture<ReplicaResult> fut = 
zonePartitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
                 .groupId(zonePartitionIdMessage(grpId))
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index bd55a59e981..f96ab30e4c0 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -164,7 +164,7 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                     );
                 });
 
-        lenient().when(txManager.finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), any(), any()))
+        lenient().when(txManager.finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), anyBoolean(), any(), any()))
                 .thenReturn(nullCompletedFuture());
 
         // Mock for creating implicit transactions when null is passed
@@ -388,7 +388,7 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
     private Map<ZonePartitionId, PendingTxPartitionEnlistment> 
extractEnlistmentsFromTxFinish() {
         ArgumentCaptor<Map<ZonePartitionId, PendingTxPartitionEnlistment>> 
enlistmentsCaptor = ArgumentCaptor.captor();
 
-        verify(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), 
anyBoolean(), enlistmentsCaptor.capture(), any());
+        verify(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), 
anyBoolean(), anyBoolean(), enlistmentsCaptor.capture(), any());
 
         return enlistmentsCaptor.getValue();
     }
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
index ab88e3c864b..9736942803d 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
@@ -738,6 +738,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
                 false,
                 false,
                 true,
+                false,
                 Map.of(commitPartition, new 
PendingTxPartitionEnlistment(txCrdNode2.node().name(), 0L)),
                 rwTx1Id
         );
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 57e7525bb5d..bab8f783c8e 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -183,6 +183,7 @@ public interface TxManager extends IgniteComponent {
      * @param commitIntent {@code true} if a commit requested.
      * @param timeoutExceeded {@code true} if a timeout exceeded.
      * @param recovery {@code true} if finished by recovery.
+     * @param noRemoteWrites {@code true} if remote(directly mapped) part of 
this transaction has no writes.
      * @param enlistedGroups Map of enlisted partitions.
      * @param txId Transaction id.
      */
@@ -192,6 +193,7 @@ public interface TxManager extends IgniteComponent {
             boolean commitIntent,
             boolean timeoutExceeded,
             boolean recovery,
+            boolean noRemoteWrites,
             Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedGroups,
             UUID txId
     );
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 6e58b3dd4f3..2cefc8e2756 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -64,6 +64,11 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
      */
     private boolean killed;
 
+    /**
+     * {@code True} if a remote(directly mapped) part of this transaction has 
no writes.
+     */
+    private boolean noRemoteWrites = true;
+
     /**
      * Constructs an explicit read-write transaction.
      *
@@ -240,6 +245,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
                             commit,
                             timeoutExceeded,
                             false,
+                            noRemoteWrites,
                             enlisted,
                             id()
                     );
@@ -310,4 +316,13 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
         // Thread safety is not needed.
         finishFuture = failedFuture(e);
     }
+
+    /**
+     * Set no remote writes flag.
+     *
+     * @param noRemoteWrites The value.
+     */
+    public void noRemoteWrites(boolean noRemoteWrites) {
+        this.noRemoteWrites = noRemoteWrites;
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 374b4e11cbf..6f25ad4e6b1 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -636,6 +636,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
             boolean commitIntent,
             boolean timeout,
             boolean recovery,
+            boolean noRemoteWrites,
             Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedGroups,
             UUID txId
     ) {
@@ -700,7 +701,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
                         enlistedGroups,
                         txId,
                         finishingStateMeta.txFinishFuture(),
-                        txContext.isNoWrites() && !recovery
+                        txContext.isNoWrites() && noRemoteWrites && !recovery
                 )
         ).whenComplete((unused, throwable) -> {
             if (localNodeId.equals(finishingStateMeta.txCoordinatorId())) {
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
index 349cafd4efd..8a4c1a16864 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
@@ -94,11 +94,12 @@ class ReadWriteTransactionImplTest extends 
BaseIgniteAbstractTest {
     private void startTxAndTryToEnlist(boolean commit) {
         HashSet<UUID> finishedTxs = new HashSet<>();
 
-        Mockito.when(txManager.finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), any(), any())).thenAnswer(invocation -> {
-            finishedTxs.add(invocation.getArgument(6));
+        Mockito.when(txManager.finish(any(), any(), anyBoolean(), 
anyBoolean(), anyBoolean(), anyBoolean(), any(), any()))
+                .thenAnswer(invocation -> {
+                    finishedTxs.add(invocation.getArgument(7));
 
-            return nullCompletedFuture();
-        });
+                    return nullCompletedFuture();
+                });
 
         Mockito.when(txManager.stateMeta(any())).thenAnswer(invocation -> {
             if (finishedTxs.contains(invocation.getArgument(0))) {


Reply via email to