This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f0806bd54a IGNITE-27947 Fix rollback for client piggyback tx (#7844)
3f0806bd54a is described below

commit 3f0806bd54ad18c4b9c98870702aff79e2722aa2
Author: Tiago Marques Godinho <[email protected]>
AuthorDate: Wed Apr 22 12:21:17 2026 +0100

    IGNITE-27947 Fix rollback for client piggyback tx (#7844)
    
    Allow rollback for piggyback transactions: piggyback client tx combines 
**transaction start** with **table operation** in one client request. If the 
table operation is stuck on a lock, the client never receives the transaction 
id. Instead, we use the `requestId` (generated on the client) to rollback the 
transaction.
    
    * Server-side:
      * `TX_ROLLBACK` accepts a request id of the first request ("piggyback") 
of a direct mapped TX. Request Id is encoded in the negative range of 
`resourceId`.
      * Track `requestId -> txId` to roll back piggyback transactions stuck on 
a lock
      * Update all the operations. RO ops have the same parameters just for 
consistency.
    * Client Side:
      * Allow multiple `onSent` callbacks on the payload output object.
      * Add info to `ClientLazyTransaction` about the first request in the TX, 
update via `PayloadOutputChannel` on successful request.
      * Implement `TX_ROLLBACK` based on `firstReqId`
---
 .../client/proto/ProtocolBitmaskFeature.java       |   7 +-
 .../ignite/client/handler/ItClientHandlerTest.java |   1 +
 .../ignite/client/handler/ClientHandlerModule.java |   3 +-
 .../handler/ClientInboundMessageHandler.java       | 283 ++++++++++++++++++---
 .../requests/sql/ClientSqlExecuteBatchRequest.java |   6 +-
 .../requests/sql/ClientSqlExecuteRequest.java      |   6 +-
 .../sql/ClientSqlQueryMetadataRequest.java         |  19 +-
 .../handler/requests/table/ClientTableCommon.java  |  40 ++-
 .../table/ClientTupleContainsAllKeysRequest.java   |   7 +-
 .../table/ClientTupleContainsKeyRequest.java       |  10 +-
 .../table/ClientTupleDeleteAllExactRequest.java    |  23 +-
 .../table/ClientTupleDeleteAllRequest.java         |  10 +-
 .../table/ClientTupleDeleteExactRequest.java       |  31 ++-
 .../requests/table/ClientTupleDeleteRequest.java   |  10 +-
 .../requests/table/ClientTupleGetAllRequest.java   |   7 +-
 .../table/ClientTupleGetAndDeleteRequest.java      |  10 +-
 .../table/ClientTupleGetAndReplaceRequest.java     |  23 +-
 .../table/ClientTupleGetAndUpsertRequest.java      |  23 +-
 .../requests/table/ClientTupleGetRequest.java      |  10 +-
 .../table/ClientTupleInsertAllRequest.java         |  22 +-
 .../requests/table/ClientTupleInsertRequest.java   |  23 +-
 .../table/ClientTupleReplaceExactRequest.java      |  11 +-
 .../requests/table/ClientTupleReplaceRequest.java  |  23 +-
 .../requests/table/ClientTupleRequestBase.java     |   9 +-
 .../table/ClientTupleUpsertAllRequest.java         |  23 +-
 .../requests/table/ClientTupleUpsertRequest.java   |  24 +-
 .../requests/table/ClientTuplesRequestBase.java    |  20 +-
 .../table/DirectTransactionWithFirstRequest.java   | 192 ++++++++++++++
 .../tx/ClientTransactionCommitRequest.java         |   3 +-
 .../tx/ClientTransactionRollbackRequest.java       |  59 +++--
 .../client/ItThinClientTransactionsTest.java       | 175 +++++++++++--
 .../internal/client/PayloadOutputChannel.java      |  14 +-
 .../ignite/internal/client/TcpClientChannel.java   |   8 +-
 .../ignite/internal/client/sql/ClientSql.java      |  31 +--
 .../ignite/internal/client/table/ClientTable.java  |  43 +---
 .../internal/client/tx/ClientLazyTransaction.java  |  68 ++++-
 .../internal/client/tx/ClientTransaction.java      |   2 +-
 .../ignite/internal/client/tx/DirectTxUtils.java   |  86 +++++++
 38 files changed, 1164 insertions(+), 201 deletions(-)

diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
index de7ddd28298..16da6fda385 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
@@ -117,7 +117,12 @@ public enum ProtocolBitmaskFeature {
     /**
      * Client supports SQL_UPDATE_COUNTERS_2 error extension (single binary 
value instead of array).
      */
-    SQL_UPDATE_COUNTERS_2(18);
+    SQL_UPDATE_COUNTERS_2(18),
+
+    /**
+     * Allow rolling back direct transactions using the first request id.
+     */
+    TX_ROLLBACK_USING_FIRST_REQUEST(19);
 
     private static final EnumSet<ProtocolBitmaskFeature> 
ALL_FEATURES_AS_ENUM_SET =
             EnumSet.allOf(ProtocolBitmaskFeature.class);
diff --git 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index b87b84c8ef1..ea38273ca24 100644
--- 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -565,6 +565,7 @@ public class ItClientHandlerTest extends 
BaseIgniteAbstractTest {
             expected.set(16);
             expected.set(17);
             expected.set(18);
+            expected.set(19);
 
             assertEquals(expected, supportedFeatures);
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index b7e6a357fc6..52259fe9492 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -103,7 +103,8 @@ public class ClientHandlerModule implements 
IgniteComponent, PlatformComputeTran
             ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
             ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME,
             ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD,
-            ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2
+            ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2,
+            ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST
     ));
 
     /** Connection id generator.
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index f7581fb228b..baa7e788611 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -29,6 +29,7 @@ import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -96,6 +97,7 @@ import 
org.apache.ignite.client.handler.requests.sql.ClientSqlQueryMetadataReque
 import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
 import 
org.apache.ignite.client.handler.requests.table.ClientStreamerBatchSendRequest;
 import 
org.apache.ignite.client.handler.requests.table.ClientStreamerWithReceiverBatchSendRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTableCommon;
 import 
org.apache.ignite.client.handler.requests.table.ClientTableGetQualifiedRequest;
 import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
 import 
org.apache.ignite.client.handler.requests.table.ClientTablePartitionPrimaryReplicasGetRequest;
@@ -190,6 +192,7 @@ import org.apache.ignite.security.AuthenticationType;
 import org.apache.ignite.security.exception.InvalidCredentialsException;
 import 
org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
 import org.apache.ignite.sql.SqlBatchException;
+import org.apache.ignite.table.IgniteTables;
 import org.apache.ignite.tx.RetriableTransactionException;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -292,6 +295,24 @@ public class ClientInboundMessageHandler
 
     private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;
 
+    /**
+     * Tracks mappings between the first {@code requestId} and the {@code 
resourceId}
+     * holding the Tx object for directly mapped transactions. The process is 
not localized.
+     *
+     * <p><b>Mappings are created:</b>
+     * <ul>
+     *     <li>When a direct transaction is created in {@link 
ClientTableCommon#readTx(ClientMessageUnpacker, HybridTimestampTracker,
+     *     ClientResourceRegistry, TxManager, IgniteTables, 
NotificationSender, long[], long, Map)}.
+     *     </li>
+     * </ul>
+     *
+     * <p><b>Mappings are removed:</b>
+     * <ul>
+     *     <li>During a commit or rollback request. Hook is added at 
creation.</li>
+     * </ul>
+     */
+    private final Map<Long, Long> firstReqToTxResMap = new 
ConcurrentHashMap<>();
+
     /**
      * Constructor.
      *
@@ -557,6 +578,7 @@ public class ClientInboundMessageHandler
             actualFeatures.clear(TX_DELAYED_ACKS.featureId());
             actualFeatures.clear(TX_PIGGYBACK.featureId());
             actualFeatures.clear(TX_ALLOW_NOOP_ENLIST.featureId());
+            actualFeatures.clear(TX_ROLLBACK_USING_FIRST_REQUEST.featureId());
 
             actualFeatures.clear(SQL_DIRECT_TX_MAPPING.featureId());
         } else {
@@ -946,69 +968,239 @@ public class ClientInboundMessageHandler
 
             case ClientOp.TUPLE_UPSERT:
                 return ClientTupleUpsertRequest.process(
-                        in, igniteTables, resources, metrics, txManager, 
clockService, notificationSender(requestId), tsTracker);
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_GET:
-                return ClientTupleGetRequest.process(in, igniteTables, 
resources, metrics, txManager, clockService, tsTracker);
+                return ClientTupleGetRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_UPSERT_ALL:
-                return ClientTupleUpsertAllRequest.process(in, igniteTables, 
resources, metrics, txManager, clockService,
-                        notificationSender(requestId), tsTracker);
+                return ClientTupleUpsertAllRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_GET_ALL:
-                return ClientTupleGetAllRequest.process(in, igniteTables, 
resources, metrics, txManager, clockService, tsTracker,
-                        
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));
+                return ClientTupleGetAllRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap,
+                        
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS)
+                );
 
             case ClientOp.TUPLE_GET_AND_UPSERT:
-                return ClientTupleGetAndUpsertRequest.process(in, 
igniteTables, resources, metrics, txManager, clockService,
-                        notificationSender(requestId), tsTracker);
+                return ClientTupleGetAndUpsertRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_INSERT:
-                return ClientTupleInsertRequest.process(in, igniteTables, 
resources, metrics, txManager, clockService,
-                        notificationSender(requestId), tsTracker);
+                return ClientTupleInsertRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_INSERT_ALL:
-                return ClientTupleInsertAllRequest.process(in, igniteTables, 
resources, metrics, txManager, clockService,
-                        notificationSender(requestId), tsTracker);
+                return ClientTupleInsertAllRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_REPLACE:
-                return ClientTupleReplaceRequest.process(in, igniteTables, 
resources, metrics, txManager, clockService,
-                        notificationSender(requestId), tsTracker);
+                return ClientTupleReplaceRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_REPLACE_EXACT:
-                return ClientTupleReplaceExactRequest.process(in, 
igniteTables, resources, metrics, txManager, clockService,
-                        notificationSender(requestId), tsTracker);
+                return ClientTupleReplaceExactRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_GET_AND_REPLACE:
-                return ClientTupleGetAndReplaceRequest.process(in, 
igniteTables, resources, metrics, txManager, clockService,
-                        notificationSender(requestId), tsTracker);
+                return ClientTupleGetAndReplaceRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_DELETE:
-                return ClientTupleDeleteRequest.process(in, igniteTables, 
resources, metrics, txManager, clockService,
-                        notificationSender(requestId), tsTracker);
+                return ClientTupleDeleteRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_DELETE_ALL:
-                return ClientTupleDeleteAllRequest.process(in, igniteTables, 
resources, metrics, txManager, clockService,
-                        notificationSender(requestId), tsTracker);
+                return ClientTupleDeleteAllRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_DELETE_EXACT:
-                return ClientTupleDeleteExactRequest.process(in, igniteTables, 
resources, metrics, txManager, clockService,
-                        notificationSender(requestId), tsTracker);
+                return ClientTupleDeleteExactRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_DELETE_ALL_EXACT:
-                return ClientTupleDeleteAllExactRequest.process(in, 
igniteTables, resources, metrics, txManager, clockService,
-                        notificationSender(requestId), tsTracker);
+                return ClientTupleDeleteAllExactRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_GET_AND_DELETE:
-                return ClientTupleGetAndDeleteRequest.process(in, 
igniteTables, resources, metrics, txManager, clockService,
-                        notificationSender(requestId), tsTracker);
+                return ClientTupleGetAndDeleteRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        notificationSender(requestId),
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_CONTAINS_KEY:
-                return ClientTupleContainsKeyRequest.process(in, igniteTables, 
resources, metrics, txManager, clockService, tsTracker);
+                return ClientTupleContainsKeyRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
+                );
 
             case ClientOp.TUPLE_CONTAINS_ALL_KEYS:
-                return ClientTupleContainsAllKeysRequest.process(in, 
igniteTables, resources, metrics, txManager, clockService, tsTracker,
-                        
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));
+                return ClientTupleContainsAllKeysRequest.process(
+                        in,
+                        igniteTables,
+                        resources,
+                        metrics,
+                        txManager,
+                        clockService,
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap,
+                        
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS)
+                );
 
             case ClientOp.JDBC_CONNECT:
                 return ClientJdbcConnectRequest.execute(in, 
jdbcQueryEventHandler, resolveCurrentUsername());
@@ -1057,8 +1249,14 @@ public class ClientInboundMessageHandler
                         clientContext.hasFeature(TX_PIGGYBACK), 
clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES), tsTracker);
 
             case ClientOp.TX_ROLLBACK:
-                return ClientTransactionRollbackRequest.process(in, resources, 
metrics, igniteTables,
-                        clientContext.hasFeature(TX_PIGGYBACK), 
clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES));
+                return ClientTransactionRollbackRequest.process(in,
+                        resources,
+                        metrics,
+                        igniteTables,
+                        firstReqToTxResMap,
+                        clientContext.hasFeature(TX_PIGGYBACK),
+                        
clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES)
+                );
 
             case ClientOp.COMPUTE_EXECUTE:
                 return ClientComputeExecuteRequest.process(in, compute, 
clusterService, notificationSender(requestId), clientContext);
@@ -1112,6 +1310,7 @@ public class ClientInboundMessageHandler
                         igniteTables,
                         clockService,
                         notificationSender(requestId),
+                        firstReqToTxResMap,
                         resolveCurrentUsername(),
                         clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT),
                         
clientContext.hasFeature(SQL_PARTITION_AWARENESS_TABLE_NAME),
@@ -1143,12 +1342,26 @@ public class ClientInboundMessageHandler
 
             case ClientOp.SQL_QUERY_META:
                 return ClientSqlQueryMetadataRequest.process(
-                        partitionOperationsExecutor, in, queryProcessor, 
resources, metrics, tsTracker
+                        partitionOperationsExecutor,
+                        in,
+                        queryProcessor,
+                        resources,
+                        metrics,
+                        tsTracker,
+                        requestId,
+                        firstReqToTxResMap
                 );
 
             case ClientOp.SQL_EXEC_BATCH:
                 return ClientSqlExecuteBatchRequest.process(
-                        in, queryProcessor, resources, metrics, requestId, 
cancelHandles, tsTracker,
+                        in,
+                        queryProcessor,
+                        resources,
+                        metrics,
+                        requestId,
+                        cancelHandles,
+                        tsTracker,
+                        firstReqToTxResMap,
                         resolveCurrentUsername()
                 );
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
index 4f0c9270235..5ab769f45ab 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
@@ -48,6 +48,7 @@ public class ClientSqlExecuteBatchRequest {
      * @param cancelHandleMap Registry of handlers. Request must register 
itself in this registry before switching to another
      *         thread.
      * @param username Authenticated user name.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future representing result of operation.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -58,6 +59,7 @@ public class ClientSqlExecuteBatchRequest {
             long requestId,
             Map<Long, CancelHandle> cancelHandleMap,
             HybridTimestampTracker tsTracker,
+            Map<Long, Long> reqToTxMap,
             String username
     ) {
         CancelHandle cancelHandle = CancelHandle.create();
@@ -71,7 +73,9 @@ public class ClientSqlExecuteBatchRequest {
                 null,
                 null,
                 null,
-                null
+                null,
+                requestId,
+                reqToTxMap
         );
 
         ClientSqlProperties props = new ClientSqlProperties(in, false);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index ec39a525a68..98760fb079f 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -79,6 +79,7 @@ public class ClientSqlExecuteRequest {
      *         transaction.
      * @param notificationSender Notification sender is required to send 
acknowledge for underlying write operation within a remote
      *         transaction.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @param username Authenticated user name or {@code null} for unknown 
user.
      * @return Future representing result of operation.
      */
@@ -97,6 +98,7 @@ public class ClientSqlExecuteRequest {
             IgniteTables tables,
             ClockService clockService,
             NotificationSender notificationSender,
+            Map<Long, Long> reqToTxMap,
             @Nullable String username,
             boolean sqlMultistatementSupported,
             boolean sqlPartitionAwarenessQualifiedNameSupported,
@@ -119,7 +121,9 @@ public class ClientSqlExecuteRequest {
                 txManager,
                 tables,
                 notificationSender,
-                resIdHolder
+                resIdHolder,
+                requestId,
+                reqToTxMap
         );
 
         ClientSqlProperties props = new ClientSqlProperties(in, 
sqlMultistatementSupported);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
index 06c63951060..46c84d300aa 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.sql;
 
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
@@ -43,6 +44,8 @@ public class ClientSqlQueryMetadataRequest {
      * @param in Unpacker.
      * @param processor SQL API.
      * @param resources Resources.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future representing result of operation.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -51,9 +54,21 @@ public class ClientSqlQueryMetadataRequest {
             QueryProcessor processor,
             ClientResourceRegistry resources,
             ClientHandlerMetricSource metrics,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker, 
resources, metrics, null, null, null, null);
+        CompletableFuture<InternalTransaction> txFut = readTx(in,
+                tsTracker,
+                resources,
+                metrics,
+                null,
+                null,
+                null,
+                null,
+                requestId,
+                reqToTxMap
+        );
 
         String schema = in.unpackString();
         String query = in.unpackString();
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 2fbf674f21a..c954cac1feb 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -30,6 +30,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHE
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
@@ -422,6 +423,8 @@ public class ClientTableCommon {
      * @param txManager Tx manager.
      * @param notificationSender Notification sender.
      * @param resourceIdHolder Resource id holder.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Transaction, if present, or null.
      */
     public static CompletableFuture<@Nullable InternalTransaction> readTx(
@@ -432,7 +435,9 @@ public class ClientTableCommon {
             @Nullable TxManager txManager,
             @Nullable IgniteTables tables,
             @Nullable NotificationSender notificationSender,
-            long[] resourceIdHolder
+            long[] resourceIdHolder,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
         return readTx(
                 in,
@@ -443,6 +448,8 @@ public class ClientTableCommon {
                 tables,
                 notificationSender,
                 resourceIdHolder,
+                requestId,
+                reqToTxMap,
                 EnumSet.noneOf(RequestOptions.class)
         );
     }
@@ -456,6 +463,8 @@ public class ClientTableCommon {
      * @param txManager Tx manager.
      * @param notificationSender Notification sender.
      * @param resourceIdHolder Resource id holder.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @param options Request options. Defines how a request is processed.
      * @return Transaction, if present, or null.
      */
@@ -468,6 +477,8 @@ public class ClientTableCommon {
             @Nullable IgniteTables tables,
             @Nullable NotificationSender notificationSender,
             long[] resourceIdHolder,
+            long requestId,
+            Map<Long, Long> reqToTxMap,
             EnumSet<RequestOptions> options
     ) {
         if (in.tryUnpackNil()) {
@@ -509,11 +520,18 @@ public class ClientTableCommon {
                 });
 
                 InternalTxOptions txOptions = builder.build();
-                var tx = startExplicitTx(tsUpdater, txManager, 
HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions);
+                var tx = new DirectTransactionWithFirstRequest(
+                        startExplicitTx(tsUpdater, txManager, 
HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions),
+                        reqToTxMap,
+                        requestId
+                );
 
                 // Attach resource id only on first direct request.
                 resourceIdHolder[0] = resources.put(new ClientResource(tx, 
tx::rollbackAsync));
 
+                // Record the mapping between first request and resourceId.
+                reqToTxMap.put(requestId, resourceIdHolder[0]);
+
                 metrics.transactionsActiveIncrement();
 
                 return completedFuture(tx);
@@ -596,9 +614,23 @@ public class ClientTableCommon {
             IgniteTables tables,
             EnumSet<RequestOptions> options,
             @Nullable NotificationSender notificationSender,
-            long[] resourceIdHolder
+            long[] resourceIdHolder,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return readTx(in, readTs, resources, metrics, txManager, tables, 
notificationSender, resourceIdHolder, options)
+        return readTx(
+                in,
+                readTs,
+                resources,
+                metrics,
+                txManager,
+                tables,
+                notificationSender,
+                resourceIdHolder,
+                requestId,
+                reqToTxMap,
+                options
+        )
                 .thenApply(tx -> {
                     if (tx == null) {
                         // Implicit transactions do not use an observation 
timestamp because RW never depends on it,
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
index eb006e5682c..4438c0cf5a0 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequest
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
 
 import java.util.EnumSet;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -47,6 +48,8 @@ public class ClientTupleContainsAllKeysRequest {
      * @param txManager    Transaction manager.
      * @param clockService Clock service.
      * @param tsTracker    Tracker.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @param supportsOptions {@code True} if supports tx options.
      * @return Future.
      */
@@ -58,11 +61,13 @@ public class ClientTupleContainsAllKeysRequest {
             TxManager txManager,
             ClockService clockService,
             HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap,
             boolean supportsOptions
     ) {
         EnumSet<RequestOptions> options = supportsOptions ? of(KEY_ONLY, 
HAS_OPTIONS) : of(KEY_ONLY);
 
-        return ClientTuplesRequestBase.readAsync(in, tables, resources, 
metrics, txManager, null, tsTracker, options)
+        return ClientTuplesRequestBase.readAsync(in, tables, resources, 
metrics, txManager, null, tsTracker, options, requestId, reqToTxMap)
                 .thenCompose(req -> 
req.table().recordView().containsAllAsync(req.tx(), req.tuples())
                         .thenApply(containsAll -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
index 8a5ffd08f65..1c41263cf26 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
@@ -21,7 +21,9 @@ import static java.util.EnumSet.of;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.READ_ONLY;
+import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -43,6 +45,8 @@ public class ClientTupleContainsKeyRequest {
      * @param tables    Ignite tables.
      * @param resources Resource registry.
      * @param txManager Transaction manager.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -52,9 +56,11 @@ public class ClientTupleContainsKeyRequest {
             ClientHandlerMetricSource metrics,
             TxManager txManager,
             ClockService clockService,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return ClientTupleRequestBase.readAsync(in, tables, resources, 
metrics, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY))
+        return readAsync(in, tables, resources, metrics, txManager, null, 
tsTracker, of(READ_ONLY, KEY_ONLY), requestId, reqToTxMap)
                 .thenCompose(req -> 
req.table().recordView().containsAsync(req.tx(), req.tuple())
                         .thenApply(res -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
index e53caedd770..b8183426664 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -45,6 +46,8 @@ public class ClientTupleDeleteAllExactRequest {
      * @param tables Ignite tables.
      * @param resources Resource registry.
      * @param txManager Ignite transactions.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -55,13 +58,27 @@ public class ClientTupleDeleteAllExactRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return readAsync(in, tables, resources, metrics, txManager, 
notificationSender, tsTracker, noneOf(RequestOptions.class))
+        return readAsync(
+                in,
+                tables,
+                resources,
+                metrics,
+                txManager,
+                notificationSender,
+                tsTracker,
+                noneOf(RequestOptions.class),
+                requestId,
+                reqToTxMap
+        )
                 .thenCompose(req -> 
req.table().recordView().deleteAllExactAsync(req.tx(), req.tuples())
                         .thenApply(skippedTuples -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
                             writeTuples(out, skippedTuples, 
req.table().schemaView());
-                        }));
+                        })
+                );
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
index 68c5b8ecfbe..0e3cc2b9a94 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
@@ -21,7 +21,9 @@ import static java.util.EnumSet.of;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuples;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
+import static 
org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -45,6 +47,8 @@ public class ClientTupleDeleteAllRequest {
      * @param tables Ignite tables.
      * @param resources Resource registry.
      * @param txManager Ignite transactions.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -55,9 +59,11 @@ public class ClientTupleDeleteAllRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return ClientTuplesRequestBase.readAsync(in, tables, resources, 
metrics, txManager, notificationSender, tsTracker, of(KEY_ONLY))
+        return readAsync(in, tables, resources, metrics, txManager, 
notificationSender, tsTracker, of(KEY_ONLY), requestId, reqToTxMap)
                 .thenCompose(req -> 
req.table().recordView().deleteAllAsync(req.tx(), req.tuples())
                         .thenApply(skippedTuples -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
index 82233f81bf9..c95f09358bc 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -44,6 +45,8 @@ public class ClientTupleDeleteExactRequest {
      * @param tables Ignite tables.
      * @param resources Resource registry.
      * @param txManager Ignite transactions.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -54,14 +57,28 @@ public class ClientTupleDeleteExactRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return readAsync(in, tables, resources, metrics, txManager, 
notificationSender, tsTracker, noneOf(RequestOptions.class))
+        return readAsync(
+                in,
+                tables,
+                resources,
+                metrics,
+                txManager,
+                notificationSender,
+                tsTracker,
+                noneOf(RequestOptions.class),
+                requestId,
+                reqToTxMap
+        )
                 .thenCompose(req -> 
req.table().recordView().deleteExactAsync(req.tx(), req.tuple())
-                        .thenApply(res -> out -> {
-                            writeTxMeta(out, tsTracker, clockService, req);
-                            
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
-                            out.packBoolean(res);
-                        }));
+                    .thenApply(res -> out -> {
+                        writeTxMeta(out, tsTracker, clockService, req);
+                        
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
+                        out.packBoolean(res);
+                    })
+                );
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
index 339227e18e4..6cbf6115aeb 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.client.handler.requests.table;
 import static java.util.EnumSet.of;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
+import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -43,6 +45,8 @@ public class ClientTupleDeleteRequest {
      * @param tables Ignite tables.
      * @param resources Resource registry.
      * @param txManager Ignite transactions.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -53,9 +57,11 @@ public class ClientTupleDeleteRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return ClientTupleRequestBase.readAsync(in, tables, resources, 
metrics, txManager, notificationSender, tsTracker, of(KEY_ONLY))
+        return readAsync(in, tables, resources, metrics, txManager, 
notificationSender, tsTracker, of(KEY_ONLY), requestId, reqToTxMap)
                 .thenCompose(req -> 
req.table().recordView().deleteAsync(req.tx(), req.tuple())
                         .thenApply(res -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
index 199464b3385..ead2d85f276 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequest
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
 
 import java.util.EnumSet;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -49,6 +50,8 @@ public class ClientTupleGetAllRequest {
      * @param txManager    Transaction manager.
      * @param clockService Clock service.
      * @param tsTracker    Tracker.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @param supportsOptions {@code True} if supports tx options.
      * @return Future.
      */
@@ -60,11 +63,13 @@ public class ClientTupleGetAllRequest {
             TxManager txManager,
             ClockService clockService,
             HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap,
             boolean supportsOptions
     ) {
         EnumSet<RequestOptions> options = supportsOptions ? of(KEY_ONLY, 
HAS_OPTIONS) : of(KEY_ONLY);
 
-        return ClientTuplesRequestBase.readAsync(in, tables, resources, 
metrics, txManager, null, tsTracker, options)
+        return ClientTuplesRequestBase.readAsync(in, tables, resources, 
metrics, txManager, null, tsTracker, options, requestId, reqToTxMap)
                 .thenCompose(req -> {
                     return req.table().recordView().getAllAsync(req.tx(), 
req.tuples())
                             .thenApply(resTuples -> out -> {
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
index 5133a14bcf5..795a0435fdc 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.client.handler.requests.table;
 import static java.util.EnumSet.of;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
+import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -44,6 +46,8 @@ public class ClientTupleGetAndDeleteRequest {
      * @param tables Ignite tables.
      * @param resources Resource registry.
      * @param txManager Ignite transactions.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -54,9 +58,11 @@ public class ClientTupleGetAndDeleteRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return ClientTupleRequestBase.readAsync(in, tables, resources, 
metrics, txManager, notificationSender, tsTracker, of(KEY_ONLY))
+        return readAsync(in, tables, resources, metrics, txManager, 
notificationSender, tsTracker, of(KEY_ONLY), requestId, reqToTxMap)
                 .thenCompose(req -> 
req.table().recordView().getAndDeleteAsync(req.tx(), req.tuple())
                         .thenApply(resTuple -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
index 6fb485021d4..36dc64c8f1c 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -45,6 +46,8 @@ public class ClientTupleGetAndReplaceRequest {
      * @param tables Ignite tables.
      * @param resources Resource registry.
      * @param txManager Ignite transactions.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -55,13 +58,27 @@ public class ClientTupleGetAndReplaceRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return readAsync(in, tables, resources, metrics, txManager, 
notificationSender, tsTracker, noneOf(RequestOptions.class))
+        return readAsync(
+                in,
+                tables,
+                resources,
+                metrics,
+                txManager,
+                notificationSender,
+                tsTracker,
+                noneOf(RequestOptions.class),
+                requestId,
+                reqToTxMap
+        )
                 .thenCompose(req -> 
req.table().recordView().getAndReplaceAsync(req.tx(), req.tuple())
                         .thenApply(resTuple -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
                             ClientTableCommon.writeTupleOrNil(out, resTuple, 
TuplePart.KEY_AND_VAL, req.table().schemaView());
-                        }));
+                        })
+                );
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
index d8f3420df2a..620e3191580 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -45,6 +46,8 @@ public class ClientTupleGetAndUpsertRequest {
      * @param tables Ignite tables.
      * @param resources Resource registry.
      * @param txManager Ignite transactions.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -55,13 +58,27 @@ public class ClientTupleGetAndUpsertRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return readAsync(in, tables, resources, metrics, txManager, 
notificationSender, tsTracker, noneOf(RequestOptions.class))
+        return readAsync(
+                in,
+                tables,
+                resources,
+                metrics,
+                txManager,
+                notificationSender,
+                tsTracker,
+                noneOf(RequestOptions.class),
+                requestId,
+                reqToTxMap
+        )
                 .thenCompose(req -> 
req.table().recordView().getAndUpsertAsync(req.tx(), req.tuple())
                         .thenApply(resTuple -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
                             ClientTableCommon.writeTupleOrNil(out, resTuple, 
TuplePart.KEY_AND_VAL, req.table().schemaView());
-                        }));
+                        })
+                );
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
index 48fd2ca0062..2d71d2cd828 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
@@ -21,7 +21,9 @@ import static java.util.EnumSet.of;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.READ_ONLY;
+import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -44,6 +46,8 @@ public class ClientTupleGetRequest {
      * @param tables Ignite tables.
      * @param resources Resource registry.
      * @param clockService Clock service.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -53,9 +57,11 @@ public class ClientTupleGetRequest {
             ClientHandlerMetricSource metrics,
             TxManager txManager,
             ClockService clockService,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return ClientTupleRequestBase.readAsync(in, tables, resources, 
metrics, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY))
+        return readAsync(in, tables, resources, metrics, txManager, null, 
tsTracker, of(READ_ONLY, KEY_ONLY), requestId, reqToTxMap)
                 .thenCompose(req -> 
req.table().recordView().getAsync(req.tx(), req.tuple())
                         .thenApply(res -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
index ecc68421db5..b5fc7d80f40 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -47,6 +48,8 @@ public class ClientTupleInsertAllRequest {
      * @param txManager Ignite transactions.
      * @param clockService Clock service.
      * @param notificationSender Notification sender.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -57,13 +60,26 @@ public class ClientTupleInsertAllRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return readAsync(in, tables, resources, metrics, txManager, 
notificationSender, tsTracker, noneOf(RequestOptions.class))
+        return readAsync(in,
+                tables,
+                resources,
+                metrics,
+                txManager,
+                notificationSender,
+                tsTracker,
+                noneOf(RequestOptions.class),
+                requestId,
+                reqToTxMap
+        )
                 .thenCompose(req -> 
req.table().recordView().insertAllAsync(req.tx(), req.tuples())
                         .thenApply(skippedTuples -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
                             writeTuples(out, skippedTuples, 
req.table().schemaView());
-                        }));
+                        })
+                );
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
index c8bfe6b60e2..1dada6202e2 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -46,6 +47,8 @@ public class ClientTupleInsertRequest {
      * @param txManager Ignite transactions.
      * @param clockService Clock service.
      * @param notificationSender Notification sender.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -56,14 +59,28 @@ public class ClientTupleInsertRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return readAsync(in, tables, resources, metrics, txManager, 
notificationSender, tsTracker, noneOf(RequestOptions.class))
+        return readAsync(
+                in,
+                tables,
+                resources,
+                metrics,
+                txManager,
+                notificationSender,
+                tsTracker,
+                noneOf(RequestOptions.class),
+                requestId,
+                reqToTxMap
+        )
                 .thenCompose(req -> 
req.table().recordView().insertAsync(req.tx(), req.tuple())
                         .thenApply(res -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
                             out.packBoolean(res);
-                        }));
+                        })
+                );
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
index ecf2c300eac..a94f77b3db8 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.of;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.READ_SECOND_TUPLE;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -43,6 +44,8 @@ public class ClientTupleReplaceExactRequest {
      * @param tables Ignite tables.
      * @param resources Resource registry.
      * @param txManager Ignite transactions.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -53,7 +56,9 @@ public class ClientTupleReplaceExactRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
         return ClientTupleRequestBase.readAsync(
                         in,
@@ -63,7 +68,9 @@ public class ClientTupleReplaceExactRequest {
                         txManager,
                         notificationSender,
                         tsTracker,
-                        of(READ_SECOND_TUPLE)
+                        of(READ_SECOND_TUPLE),
+                        requestId,
+                        reqToTxMap
                 )
                 .thenCompose(req -> 
req.table().recordView().replaceExactAsync(req.tx(), req.tuple(), req.tuple2())
                         .thenApply(res -> out -> {
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
index 86d6d58078d..ed5f141342e 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -44,6 +45,8 @@ public class ClientTupleReplaceRequest {
      * @param tables Ignite tables.
      * @param resources Resource registry.
      * @param txManager Ignite transactions.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -54,14 +57,28 @@ public class ClientTupleReplaceRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return readAsync(in, tables, resources, metrics, txManager, 
notificationSender, tsTracker, noneOf(RequestOptions.class))
+        return readAsync(
+                in,
+                tables,
+                resources,
+                metrics,
+                txManager,
+                notificationSender,
+                tsTracker,
+                noneOf(RequestOptions.class),
+                requestId,
+                reqToTxMap
+        )
                 .thenCompose(req -> 
req.table().recordView().replaceAsync(req.tx(), req.tuple())
                         .thenApply(res -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
                             out.packBoolean(res);
-                        }));
+                        })
+                );
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
index b2cdc608a87..66fd12c134d 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
@@ -25,6 +25,7 @@ import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequest
 
 import java.util.BitSet;
 import java.util.EnumSet;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -93,7 +94,9 @@ class ClientTupleRequestBase {
             TxManager txManager,
             @Nullable NotificationSender notificationSender,
             HybridTimestampTracker tsTracker,
-            EnumSet<RequestOptions> options
+            EnumSet<RequestOptions> options,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
         int tableId = in.unpackInt();
 
@@ -108,7 +111,9 @@ class ClientTupleRequestBase {
                 tables,
                 options,
                 notificationSender,
-                resIdHolder
+                resIdHolder,
+                requestId,
+                reqToTxMap
         );
 
         int schemaId = in.unpackInt();
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
index 1d6604e0290..a6f4d14beef 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -44,6 +45,8 @@ public class ClientTupleUpsertAllRequest {
      * @param tables Ignite tables.
      * @param resources Resource registry.
      * @param txManager Ignite transactions.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -54,13 +57,27 @@ public class ClientTupleUpsertAllRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return readAsync(in, tables, resources, metrics, txManager, 
notificationSender, tsTracker, noneOf(RequestOptions.class))
+        return readAsync(
+                in,
+                tables,
+                resources,
+                metrics,
+                txManager,
+                notificationSender,
+                tsTracker,
+                noneOf(RequestOptions.class),
+                requestId,
+                reqToTxMap
+        )
                 .thenCompose(req -> 
req.table().recordView().upsertAllAsync(req.tx(), req.tuples())
                         .thenApply(v -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
-                        }));
+                        })
+                );
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
index 58efb8accc8..d549ec8ee82 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -37,6 +38,7 @@ import org.apache.ignite.table.IgniteTables;
  * Client tuple upsert request.
  */
 public class ClientTupleUpsertRequest {
+
     /**
      * Processes the request.
      *
@@ -46,6 +48,8 @@ public class ClientTupleUpsertRequest {
      * @param txManager Ignite transactions.
      * @param clockService Clock service.
      * @param notificationSender Notification sender.
+     * @param requestId Id of the request.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -56,13 +60,27 @@ public class ClientTupleUpsertRequest {
             TxManager txManager,
             ClockService clockService,
             NotificationSender notificationSender,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
-        return readAsync(in, tables, resources, metrics, txManager, 
notificationSender, tsTracker, noneOf(RequestOptions.class))
+        return readAsync(
+                in,
+                tables,
+                resources,
+                metrics,
+                txManager,
+                notificationSender,
+                tsTracker,
+                noneOf(RequestOptions.class),
+                requestId,
+                reqToTxMap
+        )
                 .thenCompose(req -> 
req.table().recordView().upsertAsync(req.tx(), req.tuple())
                         .thenApply(v -> out -> {
                             writeTxMeta(out, tsTracker, clockService, req);
                             
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
-                        }));
+                        })
+                );
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
index 43961bcd6d3..05975e62fcc 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -77,14 +78,27 @@ class ClientTuplesRequestBase {
             TxManager txManager,
             @Nullable NotificationSender notificationSender,
             HybridTimestampTracker tsTracker,
-            EnumSet<RequestOptions> options
+            EnumSet<RequestOptions> options,
+            long requestId,
+            Map<Long, Long> reqToTxMap
     ) {
         int tableId = in.unpackInt();
 
         long[] resIdHolder = {0};
 
-        CompletableFuture<InternalTransaction> txFut =
-                readOrStartImplicitTx(in, tsTracker, resources, metrics, 
txManager, tables, options, notificationSender, resIdHolder);
+        CompletableFuture<InternalTransaction> txFut = readOrStartImplicitTx(
+                in,
+                tsTracker,
+                resources,
+                metrics,
+                txManager,
+                tables,
+                options,
+                notificationSender,
+                resIdHolder,
+                requestId,
+                reqToTxMap
+        );
 
         int schemaId = in.unpackInt();
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/DirectTransactionWithFirstRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/DirectTransactionWithFirstRequest.java
new file mode 100644
index 00000000000..e5482e1ea56
--- /dev/null
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/DirectTransactionWithFirstRequest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+class DirectTransactionWithFirstRequest implements InternalTransaction, 
Wrapper {
+    private final InternalTransaction base;
+
+    // We could also just accept a lambda.
+    private final Map<Long, Long> reqToTxMap;
+
+    private final long firstReqId;
+
+    DirectTransactionWithFirstRequest(InternalTransaction base, Map<Long, 
Long> reqToTxMap, long firstReqId) {
+        this.base = base;
+        this.reqToTxMap = reqToTxMap;
+        this.firstReqId = firstReqId;
+    }
+
+    @Override
+    public UUID id() {
+        return base.id();
+    }
+
+    @Override
+    public PendingTxPartitionEnlistment enlistedPartition(ZonePartitionId 
replicationGroupId) {
+        return base.enlistedPartition(replicationGroupId);
+    }
+
+    @Override
+    public TxState state() {
+        return base.state();
+    }
+
+    @Override
+    public boolean assignCommitPartition(ZonePartitionId commitPartitionId) {
+        return base.assignCommitPartition(commitPartitionId);
+    }
+
+    @Override
+    public ZonePartitionId commitPartition() {
+        return base.commitPartition();
+    }
+
+    @Override
+    public void enlist(ZonePartitionId replicationGroupId, int tableId, String 
primaryNodeConsistentId, long consistencyToken) {
+        base.enlist(replicationGroupId, tableId, primaryNodeConsistentId, 
consistencyToken);
+    }
+
+    @Override
+    public @Nullable HybridTimestamp readTimestamp() {
+        return base.readTimestamp();
+    }
+
+    @Override
+    public HybridTimestamp schemaTimestamp() {
+        return base.schemaTimestamp();
+    }
+
+    @Override
+    public UUID coordinatorId() {
+        return base.coordinatorId();
+    }
+
+    @Override
+    public boolean implicit() {
+        return base.implicit();
+    }
+
+    @Override
+    public boolean remote() {
+        return base.remote();
+    }
+
+    @Override
+    public boolean remoteOnCoordinator() {
+        return base.remoteOnCoordinator();
+    }
+
+    @Override
+    public CompletableFuture<Void> finish(boolean commit, @Nullable 
HybridTimestamp executionTimestamp, boolean full,
+            @Nullable Throwable finishReason) {
+        return base.finish(commit, executionTimestamp, full, 
finishReason).whenComplete((v, err) -> removeMapping());
+    }
+
+    @Override
+    public boolean isFinishingOrFinished() {
+        return base.isFinishingOrFinished();
+    }
+
+    @Override
+    public long getTimeout() {
+        return base.getTimeout();
+    }
+
+    @Override
+    public CompletableFuture<Void> kill() {
+        return base.kill().whenComplete((v, err) -> removeMapping());
+    }
+
+    @Override
+    public CompletableFuture<Void> rollbackWithExceptionAsync(Throwable 
throwable) {
+        return base.rollbackWithExceptionAsync(throwable).whenComplete((v, 
err) -> removeMapping());
+    }
+
+    @Override
+    public boolean isRolledBackWithTimeoutExceeded() {
+        return base.isRolledBackWithTimeoutExceeded();
+    }
+
+    @Override
+    public void processDelayedAck(Object val, @Nullable Throwable err) {
+        base.processDelayedAck(val, err);
+    }
+
+    @Override
+    public void commit() throws TransactionException {
+        try {
+            base.commit();
+        } finally {
+            removeMapping();
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> commitAsync() {
+        return base.commitAsync().whenComplete((v, err) -> removeMapping());
+    }
+
+    @Override
+    public void rollback() throws TransactionException {
+        try {
+            base.rollback();
+        } finally {
+            removeMapping();
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> rollbackAsync() {
+        return base.rollbackAsync().whenComplete((v, err) -> removeMapping());
+    }
+
+    @Override
+    public boolean isReadOnly() {
+        return base.isReadOnly();
+    }
+
+    @Override
+    public RuntimeException enlistFailedException() {
+        return base.enlistFailedException();
+    }
+
+    public InternalTransaction base() {
+        return base;
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> classToUnwrap) {
+        return (T) base;
+    }
+
+    private void removeMapping() {
+        reqToTxMap.remove(firstReqId);
+    }
+}
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
index df377d133ae..d2d26703256 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
 import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
 import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.internal.wrapper.Wrappers;
 import org.apache.ignite.tx.TransactionException;
 
 /**
@@ -89,7 +90,7 @@ public class ClientTransactionCommitRequest {
                 // Update causality. Used to assign commit timestamp after all 
enlistments.
                 
clockService.updateClock(HybridTimestamp.hybridTimestamp(causality));
 
-                ReadWriteTransactionImpl tx0 = (ReadWriteTransactionImpl) tx;
+                ReadWriteTransactionImpl tx0 = Wrappers.unwrap(tx, 
ReadWriteTransactionImpl.class);
 
                 // Enforce cleanup.
                 tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean());
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
index c6492dfa99a..9fee94a501a 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.tx;
 
 import static 
org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest.merge;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -29,6 +30,9 @@ import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
+import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.lang.ErrorGroups.Client;
+import org.apache.ignite.lang.IgniteException;
 
 /**
  * Client transaction rollback request.
@@ -41,6 +45,7 @@ public class ClientTransactionRollbackRequest {
      * @param resources Resources.
      * @param metrics Metrics.
      * @param igniteTables Tables facade.
+     * @param reqToTxMap Tracker for first request of direct transactions.
      * @param enableDirectMapping Enable direct mapping.
      * @param sendRemoteWritesFlag Send remote writes flag.
      * @return Future.
@@ -50,37 +55,55 @@ public class ClientTransactionRollbackRequest {
             ClientResourceRegistry resources,
             ClientHandlerMetricSource metrics,
             IgniteTablesInternal igniteTables,
+            Map<Long, Long> reqToTxMap,
             boolean enableDirectMapping,
             boolean sendRemoteWritesFlag
     )
             throws IgniteInternalCheckedException {
         long resourceId = in.unpackLong();
 
-        InternalTransaction tx = 
resources.remove(resourceId).get(InternalTransaction.class);
+        InternalTransaction tx;
 
-        if (enableDirectMapping && !tx.isReadOnly()) {
-            // Attempt to merge server and client transactions.
-            int cnt = in.unpackInt(); // Number of direct enlistments.
-            for (int i = 0; i < cnt; i++) {
-                int tableId = in.unpackInt();
-                int partId = in.unpackInt();
-                String consistentId = in.unpackString();
-                long token = in.unpackLong();
+        if (!enableDirectMapping) {
+            tx = resources.remove(resourceId).get(InternalTransaction.class);
+        } else if (resourceId < 0) {
+            // Direct mapping was enabled, but the user does not know the 
resourceId, so he sent the first req id.
+            long reqId = -resourceId;
+            var actualResourceId = reqToTxMap.get(reqId);
 
-                TableViewInternal table = igniteTables.cachedTable(tableId);
+            if (actualResourceId == null) {
+                throw new IgniteException(Client.RESOURCE_NOT_FOUND_ERR, 
"Failed to find resource from requestId: " + reqId);
+            }
+
+            tx = 
resources.remove(actualResourceId).get(InternalTransaction.class);
+            // Will not remove right away from reqToTxMap, it will be remove 
automatically on rollback.
+        } else {
+            tx = resources.remove(resourceId).get(InternalTransaction.class);
+
+            if (!tx.isReadOnly()) {
+                // Attempt to merge server and client transactions.
+                int cnt = in.unpackInt(); // Number of direct enlistments.
+                for (int i = 0; i < cnt; i++) {
+                    int tableId = in.unpackInt();
+                    int partId = in.unpackInt();
+                    String consistentId = in.unpackString();
+                    long token = in.unpackLong();
 
-                if (table != null) {
-                    merge(table.internalTable(), partId, consistentId, token, 
tx, false);
+                    TableViewInternal table = 
igniteTables.cachedTable(tableId);
+
+                    if (table != null) {
+                        merge(table.internalTable(), partId, consistentId, 
token, tx, false);
+                    }
                 }
-            }
 
-            if (cnt > 0) {
-                in.unpackLong(); // Unpack causality.
+                if (cnt > 0) {
+                    in.unpackLong(); // Unpack causality.
 
-                ReadWriteTransactionImpl tx0 = (ReadWriteTransactionImpl) tx;
+                    ReadWriteTransactionImpl tx0 = Wrappers.unwrap(tx, 
ReadWriteTransactionImpl.class);
 
-                // Enforce cleanup.
-                tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean());
+                    // Enforce cleanup.
+                    tx0.noRemoteWrites(sendRemoteWritesFlag && 
in.unpackBoolean());
+                }
             }
         }
 
diff --git 
a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
 
b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
index 7d18e4b3478..7d87e7da0ad 100644
--- 
a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
+++ 
b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
@@ -23,10 +23,14 @@ import static java.util.Comparator.comparing;
 import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.publicException;
 import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.publicExceptionWithHint;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -60,6 +64,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.handler.ClientInboundMessageHandler;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.client.sql.ClientSql;
 import org.apache.ignite.internal.client.sql.PartitionMappingProvider;
@@ -90,7 +95,7 @@ import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionException;
 import org.apache.ignite.tx.TransactionOptions;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -1424,34 +1429,160 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key, 
key2)), willSucceedFast());
     }
 
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-27947";)
-    public void testRollbackDoesNotBlockOnLockConflictDuringFirstRequest() 
throws InterruptedException {
-        // Note: reversed tx priority is required for this test.
-        ClientTable table = (ClientTable) table();
-        KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+    @Nested
+    class OnConflictDuringFirstRequest {
+        class Data {
+            final IgniteImpl ignite;
+            final List<Tuple> tuples;
+            final ClientLazyTransaction tx1;
+            final ClientLazyTransaction tx2;
+            final CompletableFuture<?> req2Fut;
+
+            Data(
+                    IgniteImpl ignite,
+                    List<Tuple> tuples,
+                    ClientLazyTransaction tx1,
+                    ClientLazyTransaction tx2,
+                    CompletableFuture<?> req2Fut
+            ) {
+                this.ignite = ignite;
+                this.tuples = tuples;
+                this.tx1 = tx1;
+                this.tx2 = tx2;
+                this.req2Fut = req2Fut;
+            }
+        }
 
-        Map<Partition, ClusterNode> map = 
table.partitionDistribution().primaryReplicasAsync().join();
-        List<Tuple> tuples0 = generateKeysForPartition(100, 10, map, 0, table);
+        Data prepareBlockedTransaction(KillTestContext ctx) throws 
InterruptedException {
+            ClientTable table = (ClientTable) table();
+            ClientSql sql = (ClientSql) client().sql();
 
-        // We need a waiter for this scenario.
-        Tuple key = tuples0.get(0);
-        Tuple val = val("1");
+            Map<Partition, ClusterNode> map = 
table.partitionDistribution().primaryReplicasAsync().join();
+            Entry<Partition, ClusterNode> mapping = 
map.entrySet().iterator().next();
+            List<Tuple> tuples0 = generateKeysForPartition(100, 10, map, (int) 
mapping.getKey().id(), table);
+            Ignite server = server(mapping.getValue());
+            IgniteImpl ignite = unwrapIgniteImpl(server);
+
+            // Init SQL mappings.
+            Tuple key0 = tuples0.get(0);
+            sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)", 
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+                    key0.intValue(0), key0.intValue(0) + "");
+            await().atMost(2, TimeUnit.SECONDS)
+                    .until(() -> 
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready));
+
+            // We need a waiter for this scenario.
+            Tuple key = tuples0.get(1);
+
+            ClientLazyTransaction tx2 = (ClientLazyTransaction) 
client().transactions().begin();
+            ClientLazyTransaction tx1 = (ClientLazyTransaction) 
client().transactions().begin();
+
+            // Starts the transaction.
+            assertThat(ctx.put.apply(client(), tx1, key), willSucceedIn(120, 
TimeUnit.SECONDS));
+
+            await().atMost(3, TimeUnit.SECONDS).until(() -> {
+                Iterator<Lock> locks = 
ignite.txManager().lockManager().locks(tx1.startedTx().txId());
+
+                int count = CollectionUtils.count(locks);
+                return count == 2;
+            });
+
+            // Will wait for lock.
+            CompletableFuture<?> fut2 = ctx.put.apply(client(), tx2, key);
+            Thread.sleep(500);
+
+            assertThat(fut2.isDone(), is(false));
+            IgniteTestUtils.assertThrows(AssertionError.class, () -> 
ClientTransaction.get(tx2), "Transaction is starting");
+
+            return new Data(ignite, tuples0, tx1, tx2, fut2);
+        }
 
-        ClientLazyTransaction tx1 = (ClientLazyTransaction) 
client().transactions().begin();
-        ClientLazyTransaction tx2 = (ClientLazyTransaction) 
client().transactions().begin();
+        @ParameterizedTest
+        
@MethodSource("org.apache.ignite.internal.client.ItThinClientTransactionsTest#killTestContextFactory")
+        public void testRollbackDoesNotBlock(KillTestContext ctx) throws 
InterruptedException {
+            var test = prepareBlockedTransaction(ctx);
 
-        kvView.put(tx1, key, val);
+            // Rollback should not be blocked.
+            assertThat(test.tx2.rollbackAsync(), willSucceedIn(1, 
TimeUnit.SECONDS));
+            assertThat(test.req2Fut, willThrowFast(
+                    ctx.expectedErr,
+                    "Can't acquire a lock because transaction is already 
finished"));
 
-        // Will wait for lock.
-        CompletableFuture<Void> fut2 = kvView.putAsync(tx2, key, val);
-        assertFalse(fut2.isDone());
+            assertThat(test.tx1.rollbackAsync(), willSucceedIn(1, 
TimeUnit.SECONDS));
 
-        Thread.sleep(500);
+            var ex = assertThrows(TransactionException.class, () -> 
ClientTransaction.get(test.tx2));
+            assertThat(ex.getMessage(), containsString("Transaction is already 
finished"));
+            assertThat(ex.getMessage(), containsString("committed=false"));
 
-        // Rollback should not be blocked.
-        assertThat(tx2.rollbackAsync(), willSucceedFast());
-        assertThat(tx1.rollbackAsync(), willSucceedFast());
+            KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+            assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 2)), 
willSucceedIn(5, TimeUnit.SECONDS));
+        }
+
+        @ParameterizedTest
+        
@MethodSource("org.apache.ignite.internal.client.ItThinClientTransactionsTest#killTestContextFactory")
+        public void testOperationsBlockWaitingForLock(KillTestContext ctx) 
throws InterruptedException {
+            var test = prepareBlockedTransaction(ctx);
+
+            CompletableFuture<?> fut3 = ctx.put.apply(client(), test.tx2, 
test.tuples.get(2));
+
+            assertDoesNotThrow(test.tx1::startedTx);
+
+            assertThat(test.tx1.rollbackAsync(), willSucceedIn(1, 
TimeUnit.SECONDS));
+
+            // After the lock is open, the requests are free to complete.
+            assertThat(test.req2Fut, willSucceedIn(1, TimeUnit.SECONDS));
+            assertThat(fut3, willSucceedIn(1, TimeUnit.SECONDS));
+
+            assertThat(test.tx2.rollbackAsync(), willSucceedIn(1, 
TimeUnit.SECONDS));
+
+            KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+            assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 3)), 
willSucceedIn(5, TimeUnit.SECONDS));
+        }
+
+        @ParameterizedTest
+        
@MethodSource("org.apache.ignite.internal.client.ItThinClientTransactionsTest#killTestContextFactory")
+        public void testCancelByRequestIdNotAvailable(KillTestContext ctx) 
throws InterruptedException {
+            var test = prepareBlockedTransaction(ctx);
+
+            // Remove firstReqMapping from the server side.
+            {
+                Map<Long, Long> firstReqToTxResMap = 
IgniteTestUtils.getFieldValue(
+                        test.ignite.clientInboundMessageHandler(),
+                        ClientInboundMessageHandler.class,
+                        "firstReqToTxResMap"
+                );
+
+                CompletableFuture<Object> reqInfoFut = 
IgniteTestUtils.getFieldValue(test.tx2, ClientLazyTransaction.class,
+                        "requestInfoFuture");
+                Object requestInfo = reqInfoFut.join();
+                long firstReqId = IgniteTestUtils.getFieldValue(requestInfo, 
"firstReqId");
+                firstReqToTxResMap.remove(firstReqId);
+            }
+
+            // Will block because of the error.
+            var rollbackTx2Fut1 = test.tx2.rollbackAsync();
+            Thread.sleep(1_000);
+            assertThat(rollbackTx2Fut1.isDone(), is(false));
+
+            // Do another concurrent rollback call just to make sure.
+            // If we allow multiple rollback requests by id to be sent 
concurrently, the outcome might be different.
+            var rollbackTx2Fut2 = test.tx2.rollbackAsync();
+            Thread.sleep(1_000);
+            assertThat(rollbackTx2Fut2.isDone(), is(false));
+
+            // Now unblock the transaction.
+            assertThat(test.tx1.rollbackAsync(), willSucceedIn(1, 
TimeUnit.SECONDS));
+
+            // Requests should rollback.
+            assertThat(rollbackTx2Fut1, willSucceedIn(1, TimeUnit.SECONDS));
+            assertThat(rollbackTx2Fut2, willSucceedIn(1, TimeUnit.SECONDS));
+
+            var ex = assertThrows(TransactionException.class, () -> 
ClientTransaction.get(test.tx2));
+            assertThat(ex.getMessage(), containsString("Transaction is already 
finished"));
+            assertThat(ex.getMessage(), containsString("committed=false"));
+
+            KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+            assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 2)), 
willSucceedIn(5, TimeUnit.SECONDS));
+        }
     }
 
     @ParameterizedTest
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
index d932247b62f..6c9c7c5711a 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
@@ -17,8 +17,9 @@
 
 package org.apache.ignite.internal.client;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Thin client payload output channel.
@@ -33,8 +34,8 @@ public class PayloadOutputChannel implements AutoCloseable {
     /** Client request ID. */
     private final long requestId;
 
-    /** Action to be executed when the payload is sent. */
-    private volatile @Nullable Runnable onSentAction;
+    /** Actions to be executed when the payload is sent. */
+    private final List<Runnable> onSentActions;
 
     /**
      * Constructor.
@@ -47,6 +48,7 @@ public class PayloadOutputChannel implements AutoCloseable {
         this.ch = ch;
         this.out = out;
         this.requestId = requestId;
+        this.onSentActions = new ArrayList<>();
     }
 
     /**
@@ -88,11 +90,11 @@ public class PayloadOutputChannel implements AutoCloseable {
      * @param action Action to be executed.
      */
     public void onSent(Runnable action) {
-        this.onSentAction = action;
+        this.onSentActions.add(action);
     }
 
     /** Returns an action, if any, that should be executed when the payload is 
sent successfully. */
-    @Nullable Runnable onSentAction() {
-        return onSentAction;
+    List<Runnable> onSentActions() {
+        return onSentActions;
     }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 0a9322426a1..ee07a6d6cd8 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -33,6 +33,7 @@ import io.netty.channel.ChannelFuture;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -106,7 +107,8 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS,
             ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
             ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD,
-            ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2
+            ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2,
+            ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST
     ));
 
     /** Minimum supported heartbeat interval. */
@@ -405,6 +407,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                 payloadWriter.accept(payloadCh);
             }
 
+            var actions = 
Collections.unmodifiableList(payloadCh.onSentActions());
             write(req).addListener(f -> {
                 if (!f.isSuccess()) {
                     String msg = "Failed to send request async [id=" + id + ", 
op=" + opCode + ", remoteAddress=" + cfg.getAddress() + "]";
@@ -420,8 +423,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                 } else {
                     metrics.requestsSentIncrement();
 
-                    Runnable action = payloadCh.onSentAction();
-                    if (action != null) {
+                    for (Runnable action : actions) {
                         asyncContinuationExecutor.execute(action);
                     }
                 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 694191eb744..5ea341b372e 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -337,13 +337,21 @@ public class ClientSql implements IgniteSql {
 
         return txStartFut.thenCompose(tx -> ch.serviceAsync(
                 ClientOp.SQL_EXEC,
-                payloadWriter(ctx, transaction, cancellationToken, 
queryModifiers, statement, arguments, shouldTrackOperation),
+                DirectTxUtils.payloadWriter(
+                        ctx,
+                        transaction,
+                        payloadWriter(ctx, transaction, cancellationToken, 
queryModifiers, statement, arguments, shouldTrackOperation)
+                ),
                 payloadReader(ctx, mapper, tx, statement),
                 () -> DirectTxUtils.resolveChannel(ctx, ch, 
shouldTrackOperation, tx, mapping),
                 null,
                 false
         ).handle((BiFunction<AsyncResultSet<T>, Throwable, 
CompletableFuture<AsyncResultSet<T>>>) (r, err) -> {
             if (err != null) {
+                if (DirectTxUtils.tryHandleErrorOnFirstRequest(ctx, ch)) {
+                    return failedFuture(err);
+                }
+
                 if (tx != null && shouldRecordTransactionFailure(err)) {
                     tx.recordOperationFailure(err);
                 }
@@ -352,26 +360,7 @@ public class ClientSql implements IgniteSql {
                     return failedFuture(err);
                 }
 
-                if (ctx.enlistmentToken != null) {
-                    // In case of direct mapping error need to rollback the tx 
on coordinator.
-                    return tx.rollbackAsync().handle((ignored, err0) -> {
-                        if (err0 != null) {
-                            err.addSuppressed(err0);
-                        }
-
-                        sneakyThrow(err);
-                        return null;
-                    });
-                } else {
-                    return 
tx.rollbackAndDiscardDirectMappings(false).handle((ignored, err0) -> {
-                        if (err0 != null) {
-                            err.addSuppressed(err0);
-                        }
-
-                        sneakyThrow(err);
-                        return null;
-                    });
-                }
+                return DirectTxUtils.handleErrorOnOtherRequests(ctx, tx, err);
             }
 
             return completedFuture(r);
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index b4b944c13d5..1c1e50678e4 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -497,7 +497,7 @@ public class ClientTable implements Table {
                     return txStartFut.thenCompose(tx0 -> {
                         return ch.serviceAsync(
                                 opCode,
-                                w -> writer.accept(schema, w, ctx),
+                                DirectTxUtils.payloadWriter(ctx, tx, w -> 
writer.accept(schema, w, ctx)),
                                 r -> readSchemaAndReadData(schema, r, reader, 
defaultValue, responseSchemaRequired, ctx, tx0),
                                 () -> DirectTxUtils.resolveChannel(ctx, ch, 
ClientOp.isWrite(opCode), tx0, pm),
                                 retryPolicyOverride,
@@ -508,13 +508,7 @@ public class ClientTable implements Table {
                                     if (ex != null) {
                                         Throwable cause = ex;
 
-                                        if (ctx.firstReqFut != null) {
-                                            // Create failed transaction.
-                                            ClientTransaction failed = new 
ClientTransaction(ctx.channel, ch, id, ctx.readOnly, null,
-                                                    ctx.pm, null, 
ch.observableTimestamp(), 0);
-                                            failed.fail();
-                                            ctx.firstReqFut.complete(failed);
-                                            // Txn was not started, rollback 
is not required.
+                                        if 
(DirectTxUtils.tryHandleErrorOnFirstRequest(ctx, ch)) {
                                             
fut.completeExceptionally(unwrapCause(ex));
                                             return null;
                                         }
@@ -547,37 +541,20 @@ public class ClientTable implements Table {
 
                                                 cause = cause.getCause();
                                             }
+                                        }
 
-                                            if (tx0 == null) {
-                                                fut.completeExceptionally(ex);
-                                            } else {
-                                                
tx0.rollbackAndDiscardDirectMappings(false).handle((ignored, err0) -> {
-                                                    if (err0 != null) {
-                                                        ex.addSuppressed(err0);
-                                                    }
-
-                                                    
fut.completeExceptionally(ex);
-
-                                                    return (T) null;
-                                                });
-                                            }
+                                        if (tx0 == null) {
+                                            fut.completeExceptionally(ex);
                                         } else {
-                                            // In case of direct mapping error 
we need to rollback the tx on coordinator.
-                                            
tx0.rollbackAsync().handle((ignored, err0) -> {
-                                                if (err0 != null) {
-                                                    ex.addSuppressed(err0);
-                                                }
-
-                                                fut.completeExceptionally(ex);
-
-                                                return (T) null;
-                                            });
+                                            
DirectTxUtils.handleErrorOnOtherRequests(ctx, tx0, ex)
+                                                    .whenComplete((ignored, 
err0) -> fut.completeExceptionally(err0));
                                         }
+
+                                        return null;
                                     } else {
                                         fut.complete(ret);
+                                        return null;
                                     }
-
-                                    return null;
                                 });
                     });
                 }).exceptionally(ex -> {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
index 70bf1765f9f..d22759aa1b5 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
@@ -17,14 +17,18 @@
 
 package org.apache.ignite.internal.client.tx;
 
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST;
 import static 
org.apache.ignite.internal.client.tx.ClientTransactions.USE_CONFIGURED_TIMEOUT_DEFAULT;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.EnumSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.proto.tx.ClientInternalTxOptions;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -39,12 +43,24 @@ import org.jetbrains.annotations.Nullable;
  * Lazy client transaction. Will be actually started on the first operation.
  */
 public class ClientLazyTransaction implements Transaction {
+    private static final CompletableFuture<Void> 
TX_INFO_ALREADY_RECEIVED_FUTURE =
+            failedFuture(new 
UnsupportedOperationException("TX_ROLLBACK_USING_FIRST_REQUEST was skipped 
because tx info already present."));
+
+    private static final CompletableFuture<Void> NOT_SUPPORTED_FUTURE =
+            failedFuture(new 
UnsupportedOperationException("TX_ROLLBACK_USING_FIRST_REQUEST is not 
supported"));
+
     private final long observableTimestamp;
 
     private final @Nullable TransactionOptions options;
 
     private final EnumSet<ClientInternalTxOptions> txOptions;
 
+    private final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+    private final CompletableFuture<RequestInfo> requestInfoFuture = new 
CompletableFuture<>();
+
+    private final CompletableFuture<Void> rollbackFuture = new 
CompletableFuture<>();
+
     private volatile CompletableFuture<ClientTransaction> tx;
 
     ClientLazyTransaction(HybridTimestampTracker observableTimestamp, 
@Nullable TransactionOptions options) {
@@ -94,12 +110,47 @@ public class ClientLazyTransaction implements Transaction {
     public CompletableFuture<Void> rollbackAsync() {
         var tx0 = tx;
 
+        // TODO: IGNITE-28405 This is really fishy. It will probably let you 
reuse a transaction after calling a rollback :(
         if (tx0 == null) {
             // No operations were performed, nothing to rollback.
             return nullCompletedFuture();
         }
 
-        return tx0.thenCompose(ClientTransaction::rollbackAsync);
+        if (cancelled.compareAndSet(false, true)) {
+            return CompletableFuture.anyOf(requestInfoFuture, tx0)
+                    .thenCompose(input -> {
+                        if (tx0.isDone()) {
+                            return TX_INFO_ALREADY_RECEIVED_FUTURE;
+                        }
+
+                        // The input must be from requestInfoFuture
+                        RequestInfo reqInfo = (RequestInfo) input;
+                        ClientChannel ch = reqInfo.ch;
+                        if 
(ch.protocolContext().isFeatureSupported(TX_ROLLBACK_USING_FIRST_REQUEST)) {
+                            return ch.serviceAsync(ClientOp.TX_ROLLBACK, w -> 
w.out().packLong(-reqInfo.firstReqId), r -> null);
+                        } else {
+                            return NOT_SUPPORTED_FUTURE;
+                        }
+                    })
+                    .handle((res, e) -> {
+                        // If ok, we don't need to wait for the response. If 
error, let's block.
+                        if (e == null) {
+                            return nullCompletedFuture();
+                        } else {
+                            return 
tx0.thenCompose(ClientTransaction::rollbackAsync);
+                        }
+                    })
+                    .thenCompose(f -> (CompletableFuture<Void>) f)
+                    .whenComplete((res, e) -> {
+                        if (e != null) {
+                            rollbackFuture.completeExceptionally(e);
+                        } else {
+                            rollbackFuture.complete(null);
+                        }
+                    });
+        } else {
+            return rollbackFuture;
+        }
     }
 
     @Override
@@ -221,8 +272,23 @@ public class ClientLazyTransaction implements Transaction {
         return txOptions;
     }
 
+    public void updateRequestInfo(long firstReqId, ClientChannel ch) {
+        boolean s = this.requestInfoFuture.complete(new 
RequestInfo(firstReqId, ch));
+        assert s : "Transaction request info was previously set";
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
     }
+
+    private static class RequestInfo {
+        private final long firstReqId;
+        private final ClientChannel ch;
+
+        private RequestInfo(long firstReqId, ClientChannel ch) {
+            this.firstReqId = firstReqId;
+            this.ch = ch;
+        }
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index eaab408ace6..b368b049fe2 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -86,7 +86,7 @@ public class ClientTransaction implements Transaction {
     @IgniteToStringExclude
     private final ClientChannel ch;
 
-    /** Transaction id. */
+    /** Node-local resource id for the Transaction. */
     private final long id;
 
     /** The future used on repeated commit/rollback. */
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
index 811a5689593..d0ce111c6ba 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.client.proto.tx.ClientInternalTxOptions
 import static 
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_DIRECT;
 import static 
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_FIRST_DIRECT;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.PartitionMapping;
 import org.apache.ignite.internal.client.PayloadInputChannel;
 import org.apache.ignite.internal.client.PayloadOutputChannel;
+import org.apache.ignite.internal.client.PayloadWriter;
 import org.apache.ignite.internal.client.ReliableChannel;
 import org.apache.ignite.internal.client.WriteContext;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -282,6 +284,90 @@ public class DirectTxUtils {
         });
     }
 
+    /**
+     * If the current request is the first request of a direct tx, add a 
listener to the {@link PayloadWriter}.
+     *
+     * @param ctx The {@link WriteContext} that holds transactional context 
information.
+     * @param tx The client transaction associated with the request, or {@code 
null} if none.
+     * @param base Request native {@link PayloadWriter}.
+     * @return The {@link PayloadWriter} that should be used on the request.
+     */
+    public static PayloadWriter payloadWriter(WriteContext ctx, @Nullable 
Transaction tx, PayloadWriter base) {
+        if (ctx.firstReqFut != null && tx instanceof ClientLazyTransaction) {
+            return poc -> {
+                base.accept(poc);
+
+                var clientLazyTx = (ClientLazyTransaction) tx;
+                long requestId = poc.requestId();
+                ClientChannel cc = poc.clientChannel();
+                poc.onSent(() -> clientLazyTx.updateRequestInfo(requestId, 
cc));
+            };
+        } else {
+            return base;
+        }
+    }
+
+    /**
+     * Try to handle error on first request. Returns false if not in the 
context of the first request.
+     * This method essentially populates context with a failed request 
instance.
+     *
+     * @param ctx The {@link WriteContext} that holds transactional context 
information.
+     * @param ch The {@link ReliableChannel} used to resolve the actual 
communication channel.
+     * @return Whether the error was handled or not.
+     */
+    public static boolean tryHandleErrorOnFirstRequest(WriteContext ctx, 
ReliableChannel ch) {
+        if (ctx.firstReqFut == null) {
+            return false;
+        }
+
+        // Create failed transaction.
+        ClientTransaction failed = new ClientTransaction(
+                ctx.channel,
+                ch,
+                -1,
+                ctx.readOnly,
+                null,
+                ctx.pm,
+                null,
+                ch.observableTimestamp(),
+                0
+        );
+
+        failed.fail();
+        ctx.firstReqFut.complete(failed);
+        // Txn was not started, rollback is not required.
+        return true;
+    }
+
+    /**
+     * Handles errors after the first request.
+     * Essentially call the rollback on the transaction and appends any errors 
to the original error.
+     *
+     * @param ctx The {@link WriteContext} that holds transactional context 
information.
+     * @param tx The client transaction.
+     * @param err The error to be handled.
+     * @param <T> type of the expected future.
+     * @return A completable future what always fails with the original error 
plus any suppressed errors.
+     */
+    public static <T> CompletableFuture<T> 
handleErrorOnOtherRequests(WriteContext ctx, ClientTransaction tx, Throwable 
err) {
+        CompletableFuture<Void> rollback;
+        if (ctx.enlistmentToken != null) {
+            // In case of direct mapping error need to rollback the tx on 
coordinator.
+            rollback = tx.rollbackAsync();
+        } else {
+            rollback = tx.rollbackAndDiscardDirectMappings(false);
+        }
+
+        return rollback.handle((ignored, err0) -> {
+            if (err0 != null) {
+                err.addSuppressed(err0);
+            }
+
+            sneakyThrow(err);
+            return null;
+        });
+    }
+
     private static CompletableFuture<ClientChannel> resolveChannelInner(
             WriteContext ctx,
             ReliableChannel ch,

Reply via email to