This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b5964b45b0 IGNITE-27838 Fix directly mapped transactions rollback on 
enlistment failure (#7588)
8b5964b45b0 is described below

commit 8b5964b45b09dc82f7af1c056099860af19c1a20
Author: Aleksei Scherbakov <[email protected]>
AuthorDate: Thu Feb 26 17:25:58 2026 +0300

    IGNITE-27838 Fix directly mapped transactions rollback on enlistment 
failure (#7588)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   3 +
 .../ignite/internal/client/proto/ClientOp.java     |   4 +
 .../internal/client/proto/ErrorExtensions.java     |   2 +
 .../client/proto/ProtocolBitmaskFeature.java       |   7 +-
 .../ignite/client/handler/ItClientHandlerTest.java |   1 +
 .../ignite/client/handler/ClientHandlerModule.java |   3 +-
 .../handler/ClientInboundMessageHandler.java       |  14 +-
 .../handler/requests/table/ClientTableCommon.java  |   8 +
 .../requests/tx/ClientTransactionBeginRequest.java |  17 +-
 .../tx/ClientTransactionDiscardRequest.java        |  77 +++++
 .../internal/client/ClientDelayedAckException.java |   4 +-
 .../client/ClientTransactionInflights.java         |  26 +-
 .../apache/ignite/internal/client/ClientUtils.java |   1 +
 .../ignite/internal/client/ReliableChannel.java    |  19 +-
 .../ignite/internal/client/TcpClientChannel.java   |  30 +-
 .../ignite/internal/client/sql/ClientSql.java      |  56 +++-
 .../ignite/internal/client/table/ClientTable.java  |  45 +--
 .../internal/client/tx/ClientLazyTransaction.java  |  25 +-
 .../internal/client/tx/ClientTransaction.java      | 211 ++++++++++---
 .../ClientTransactionKilledException.java}         |  35 ++-
 .../internal/client/tx/ClientTransactions.java     |   5 +-
 .../ignite/internal/client/tx/DirectTxUtils.java   |   6 +-
 .../apache/ignite/client/ClientMetricsTest.java    |   4 +-
 .../client/ClientTransactionInflightTest.java      |  23 +-
 .../org/apache/ignite/client/RetryPolicyTest.java  |   2 +-
 .../apache/ignite/client/fakes/FakeTxManager.java  |   8 +-
 .../RepeatedFinishClientTransactionTest.java       |  32 +-
 .../ignite/internal/util/CollectionUtils.java      |  22 ++
 modules/platforms/cpp/ignite/common/error_codes.h  |   1 +
 modules/platforms/cpp/ignite/odbc/common_types.cpp |   1 +
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   3 +
 .../app/client/ItAbstractThinClientTest.java       |  18 +-
 .../app/client/ItThinClientTransactionsTest.java   | 342 +++++++++++++++++++--
 .../distributed/storage/InternalTableImplTest.java |   8 +-
 ...sactionTest.java => ItKillTransactionTest.java} |   2 +-
 .../ignite/internal/tx/InternalTransaction.java    |   4 +
 .../ignite/internal/tx/InternalTxOptions.java      |  21 +-
 .../tx/TransactionExceptionMapperProvider.java     |   2 +
 .../internal/tx/TransactionKilledException.java}   |  36 +--
 .../org/apache/ignite/internal/tx/TxManager.java   |  14 +-
 .../tx/impl/IgniteAbstractTransactionImpl.java     |   4 +-
 .../internal/tx/impl/ReadWriteTransactionImpl.java |  54 +++-
 .../internal/tx/impl/TransactionInflights.java     |  23 --
 .../internal/tx/impl/TxCleanupRequestHandler.java  |  30 ++
 .../internal/tx/impl/TxCleanupRequestSender.java   |   2 +-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  14 +-
 .../tx/AbstractDeadlockPreventionTest.java         |  18 ++
 .../tx/impl/ReadOnlyTransactionImplTest.java       |   3 +-
 .../tx/impl/ReadWriteTransactionImplTest.java      |   6 +-
 49 files changed, 1050 insertions(+), 246 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index eab048a4a09..5835775d2bc 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -468,6 +468,9 @@ public class ErrorGroups {
 
         /** Operation failed due to replication delayed ack failure. */
         public static final int TX_DELAYED_ACK_ERR = 
TX_ERR_GROUP.registerErrorCode((short) 17);
+
+        /** Transaction was internally killed. This is retriable state. */
+        public static final int TX_KILLED_ERR = 
TX_ERR_GROUP.registerErrorCode((short) 18);
     }
 
     /** Replicator error group. */
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index d302934264c..cd9729c7f40 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -203,6 +203,9 @@ public class ClientOp {
     /** Get next result set. */
     public static final int SQL_CURSOR_NEXT_RESULT_SET = 74;
 
+    /** Discard request for directly mapped transactions. */
+    public static final int TX_DISCARD = 75;
+
     /** Reserved for extensions: min. */
     @SuppressWarnings("unused")
     public static final int RESERVED_EXTENSION_RANGE_START = 1000;
@@ -257,6 +260,7 @@ public class ClientOp {
         OP_MASK.set(STREAMER_BATCH_SEND);
         OP_MASK.set(TX_COMMIT);
         OP_MASK.set(TX_ROLLBACK);
+        OP_MASK.set(TX_DISCARD);
         OP_MASK.set(TUPLE_GET_ALL);
         OP_MASK.set(TUPLE_CONTAINS_ALL_KEYS);
     }
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
index c7ce644f608..3ddd836ecea 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
@@ -26,4 +26,6 @@ public class ErrorExtensions {
     public static final String SQL_UPDATE_COUNTERS = "sql-update-counters";
 
     public static final String DELAYED_ACK = "delayed-ack";
+
+    public static final String TX_KILL = "tx-kill";
 }
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
index 363070971d0..a908678aebc 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
@@ -107,7 +107,12 @@ public enum ProtocolBitmaskFeature {
     /**
      * Client supports Partition Awareness for SQL queries with table name in 
the response metadata.
      */
-    SQL_PARTITION_AWARENESS_TABLE_NAME(16);
+    SQL_PARTITION_AWARENESS_TABLE_NAME(16),
+
+    /**
+     * Send discard requests to directly mapped partitions.
+     */
+    TX_DIRECT_MAPPING_SEND_DISCARD(17);
 
     private static final EnumSet<ProtocolBitmaskFeature> 
ALL_FEATURES_AS_ENUM_SET =
             EnumSet.allOf(ProtocolBitmaskFeature.class);
diff --git 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 746b1262f9b..943ceb20877 100644
--- 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -562,6 +562,7 @@ public class ItClientHandlerTest extends 
BaseIgniteAbstractTest {
             expected.set(14);
             expected.set(15);
             expected.set(16);
+            expected.set(17);
 
             assertEquals(expected, supportedFeatures);
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 8bb881142b7..1d37822cc79 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -100,7 +100,8 @@ public class ClientHandlerModule implements 
IgniteComponent, PlatformComputeTran
             ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
             ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS,
             ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
-            ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME
+            ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME,
+            ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD
     ));
 
     /** Connection id generator.
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 94d8da215ac..036719498db 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -120,6 +120,7 @@ import 
org.apache.ignite.client.handler.requests.table.ClientTupleUpsertRequest;
 import 
org.apache.ignite.client.handler.requests.table.partition.ClientTablePartitionPrimaryReplicasNodesGetRequest;
 import 
org.apache.ignite.client.handler.requests.tx.ClientTransactionBeginRequest;
 import 
org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest;
+import 
org.apache.ignite.client.handler.requests.tx.ClientTransactionDiscardRequest;
 import 
org.apache.ignite.client.handler.requests.tx.ClientTransactionRollbackRequest;
 import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.deployment.DeploymentUnitInfo;
@@ -171,6 +172,7 @@ import 
org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
 import org.apache.ignite.internal.tx.DelayedAckException;
+import org.apache.ignite.internal.tx.TransactionKilledException;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.CancelHandle;
@@ -701,10 +703,13 @@ public class ClientInboundMessageHandler
         SchemaVersionMismatchException schemaVersionMismatchException = 
findException(err, SchemaVersionMismatchException.class);
         SqlBatchException sqlBatchException = findException(err, 
SqlBatchException.class);
         DelayedAckException delayedAckException = findException(err, 
DelayedAckException.class);
+        TransactionKilledException transactionKilledException = 
findException(err, TransactionKilledException.class);
 
         err = firstNotNull(
                 schemaVersionMismatchException,
                 sqlBatchException,
+                delayedAckException,
+                transactionKilledException,
                 ExceptionUtils.unwrapCause(err)
         );
 
@@ -746,6 +751,10 @@ public class ClientInboundMessageHandler
             packer.packInt(1); // 1 extension.
             packer.packString(ErrorExtensions.DELAYED_ACK);
             packer.packUuid(delayedAckException.txId());
+        } else if (transactionKilledException != null) {
+            packer.packInt(1); // 1 extension.
+            packer.packString(ErrorExtensions.TX_KILL);
+            packer.packUuid(transactionKilledException.txId());
         } else {
             packer.packNil(); // No extensions.
         }
@@ -947,7 +956,7 @@ public class ClientInboundMessageHandler
                 return ClientJdbcPrimaryKeyMetadataRequest.process(in, 
jdbcQueryEventHandler);
 
             case ClientOp.TX_BEGIN:
-                return ClientTransactionBeginRequest.process(in, txManager, 
resources, metrics, tsTracker);
+                return ClientTransactionBeginRequest.process(in, txManager, 
resources, metrics, tsTracker, notificationSender(requestId));
 
             case ClientOp.TX_COMMIT:
                 return ClientTransactionCommitRequest.process(in, resources, 
metrics, clockService, igniteTables,
@@ -1076,6 +1085,9 @@ public class ClientInboundMessageHandler
             case ClientOp.TABLE_GET_QUALIFIED:
                 return ClientTableGetQualifiedRequest.process(in, 
igniteTables);
 
+            case ClientOp.TX_DISCARD:
+                return ClientTransactionDiscardRequest.process(in, txManager, 
igniteTables);
+
             default:
                 throw new IgniteException(PROTOCOL_ERR, "Unexpected operation 
code: " + opCode);
         }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 159afde916d..325f047d7a5 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.InternalTxOptions;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TransactionKilledException;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxPriority;
 import org.apache.ignite.internal.tx.TxState;
@@ -488,6 +489,13 @@ public class ClientTableCommon {
                     builder = builder.timeoutMillis(timeoutMillis);
                 }
 
+                builder.killClosure(notificationSender == null ? tx -> {} : tx 
-> {
+                    // Exception will be ignored if a client doesn't support 
it.
+                    TransactionKilledException err = new 
TransactionKilledException(tx.id(), txManager);
+
+                    notificationSender.sendNotification(w -> {}, err, 
NULL_HYBRID_TIMESTAMP);
+                });
+
                 InternalTxOptions txOptions = builder.build();
                 var tx = startExplicitTx(tsUpdater, txManager, 
HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions);
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
index 73d7266425c..b2550da6a23 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
@@ -18,17 +18,20 @@
 package org.apache.ignite.client.handler.requests.tx;
 
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.startExplicitTx;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
+import org.apache.ignite.client.handler.NotificationSender;
 import org.apache.ignite.client.handler.ResponseWriter;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.tx.InternalTxOptions;
+import org.apache.ignite.internal.tx.TransactionKilledException;
 import org.apache.ignite.internal.tx.TxManager;
 
 /**
@@ -42,6 +45,7 @@ public class ClientTransactionBeginRequest {
      * @param txManager Transactions.
      * @param resources Resources.
      * @param metrics Metrics.
+     * @param notificationSender The sender.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
@@ -49,7 +53,8 @@ public class ClientTransactionBeginRequest {
             TxManager txManager,
             ClientResourceRegistry resources,
             ClientHandlerMetricSource metrics,
-            HybridTimestampTracker tsTracker
+            HybridTimestampTracker tsTracker,
+            NotificationSender notificationSender
     ) throws IgniteInternalCheckedException {
         boolean readOnly = in.unpackBoolean();
         long timeoutMillis = in.unpackLong();
@@ -63,6 +68,12 @@ public class ClientTransactionBeginRequest {
         InternalTxOptions txOptions = InternalTxOptions.builder()
                 .timeoutMillis(timeoutMillis)
                 .readTimestamp(observableTs)
+                .killClosure(notificationSender == null ? tx -> {} : tx -> {
+                    // Exception will be ignored if a client doesn't support 
it.
+                    TransactionKilledException err = new 
TransactionKilledException(tx.id(), txManager);
+
+                    notificationSender.sendNotification(w -> {}, err, 
NULL_HYBRID_TIMESTAMP);
+                })
                 .build();
 
         var tx = startExplicitTx(tsTracker, txManager, observableTs, readOnly, 
txOptions);
@@ -76,9 +87,7 @@ public class ClientTransactionBeginRequest {
             long resourceId = resources.put(new ClientResource(tx, 
tx::rollbackAsync));
             metrics.transactionsActiveIncrement();
 
-            return CompletableFuture.completedFuture(out -> {
-                out.packLong(resourceId);
-            });
+            return CompletableFuture.completedFuture(out -> 
out.packLong(resourceId));
         } catch (IgniteInternalCheckedException e) {
             tx.rollback();
             throw e;
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
new file mode 100644
index 00000000000..e343917ea71
--- /dev/null
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.tx;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ResponseWriter;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
+
+/**
+ * Client transaction direct mapping discard request.
+ */
+public class ClientTransactionDiscardRequest {
+    /**
+     * Processes the request.
+     *
+     * @param in The unpacker.
+     * @param igniteTables Tables facade.
+     * @return The future.
+     */
+    public static CompletableFuture<ResponseWriter> process(
+            ClientMessageUnpacker in,
+            TxManager txManager,
+            IgniteTablesInternal igniteTables
+    ) throws IgniteInternalCheckedException {
+        Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedPartitions 
= new HashMap<>();
+
+        UUID txId = in.unpackUuid();
+
+        int cnt = in.unpackInt(); // Number of direct enlistments.
+        for (int i = 0; i < cnt; i++) {
+            int tableId = in.unpackInt();
+            int partId = in.unpackInt();
+
+            TableViewInternal table = igniteTables.cachedTable(tableId);
+
+            if (table != null) {
+                ZonePartitionId replicationGroupId = 
table.internalTable().targetReplicationGroupId(partId);
+                enlistedPartitions.computeIfAbsent(replicationGroupId, k -> 
new PendingTxPartitionEnlistment(null, 0))
+                        .addTableId(tableId);
+            }
+        }
+
+        List<EnlistedPartitionGroup> enlistedPartitionGroups = 
enlistedPartitions.entrySet().stream()
+                .map(entry -> new EnlistedPartitionGroup(entry.getKey(), 
entry.getValue().tableIds()))
+                .collect(toList());
+
+        return txManager.discardLocalWriteIntents(enlistedPartitionGroups, 
txId).handle((res, err) -> null);
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
index ef9e06513b5..da09484db96 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
@@ -48,9 +48,9 @@ public class ClientDelayedAckException extends 
IgniteInternalException {
     }
 
     /**
-     * Gets expected schema version.
+     * Returns a related transaction id.
      *
-     * @return Expected schema version.
+     * @return The id.
      */
     public UUID txId() {
         return txId;
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
index fbfeb1d9611..5a597fffb98 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.client.tx.ClientTransaction;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -40,12 +41,13 @@ public class ClientTransactionInflights {
     /**
      * Registers the inflight update for a transaction.
      *
-     * @param txId The transaction id.
+     * @param tx The transaction.
      */
-    public void addInflight(UUID txId) {
-        txCtxMap.compute(txId, (uuid, ctx) -> {
+    public void addInflight(ClientTransaction tx) {
+        txCtxMap.compute(tx.txId(), (uuid, ctx) -> {
             if (ctx == null) {
                 ctx = new TxContext();
+                ctx.tx = tx;
             }
 
             ctx.addInflight();
@@ -125,6 +127,23 @@ public class ClientTransactionInflights {
         txCtxMap.remove(uuid);
     }
 
+    /**
+     * Returns tracked directly mapped transaction.
+     *
+     * @param id The id.
+     *
+     * @return The transaction or {@code null}.
+     */
+    public @Nullable ClientTransaction trackedTransaction(UUID id) {
+        TxContext txContext = txCtxMap.get(id);
+
+        if (txContext != null) {
+            return txContext.tx;
+        }
+
+        return null;
+    }
+
     /**
      * Check if the inflights map contains a given transaction.
      *
@@ -142,6 +161,7 @@ public class ClientTransactionInflights {
         public CompletableFuture<Void> finishFut;
         public long inflights = 0;
         public Throwable err;
+        public ClientTransaction tx;
 
         void addInflight() {
             inflights++;
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index f8661224bc1..3f4aaa122f7 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -141,6 +141,7 @@ public class ClientUtils {
             case ClientOp.TX_BEGIN:
             case ClientOp.TX_COMMIT:
             case ClientOp.TX_ROLLBACK:
+            case ClientOp.TX_DISCARD:
                 return null; // Commit/rollback use owning connection and 
bypass retry mechanism.
 
             case ClientOp.JDBC_SQL_EXEC_PS_BATCH:
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 036d34f43c1..3b54a00f353 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -399,7 +399,24 @@ public final class ReliableChannel implements 
AutoCloseable {
     }
 
     /**
-     * Get the channel.
+     * Gets the existing channel.
+     *
+     * @param nodeName Node name.
+     *
+     * @return The channel or {@code null} if connection is not available.
+     */
+    public @Nullable ClientChannel getNodeChannel(String nodeName) {
+        ClientChannelHolder holder = nodeChannelsByName.get(nodeName);
+
+        if (holder == null || holder.close) {
+            return null;
+        }
+
+        return holder.getNow();
+    }
+
+    /**
+     * Gets the channel.
      *
      * @param preferredNodeName Preferred node name.
      *
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 7ead14efb64..938b19a4e5a 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -65,6 +65,8 @@ import org.apache.ignite.internal.client.proto.HandshakeUtils;
 import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
 import org.apache.ignite.internal.client.proto.ProtocolVersion;
 import org.apache.ignite.internal.client.proto.ResponseFlags;
+import org.apache.ignite.internal.client.tx.ClientTransaction;
+import org.apache.ignite.internal.client.tx.ClientTransactionKilledException;
 import org.apache.ignite.internal.future.timeout.TimeoutObject;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.properties.IgniteProductVersion;
@@ -100,7 +102,8 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS,
             ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
             ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS,
-            ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES
+            ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
+            ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD
     ));
 
     /** Minimum supported heartbeat interval. */
@@ -603,6 +606,13 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                     ClientDelayedAckException err0 = 
(ClientDelayedAckException) err;
 
                     inflights.removeInflight(err0.txId(), new 
TransactionException(err0.code(), err0.getMessage(), err0.getCause()));
+                } else if (err instanceof ClientTransactionKilledException) {
+                    ClientTransactionKilledException err0 = 
(ClientTransactionKilledException) err;
+
+                    ClientTransaction tx = 
inflights.trackedTransaction(err0.txId());
+                    if (tx != null) {
+                        tx.discardDirectMappings(true);
+                    }
                 }
 
                 // Can't do anything to remove stuck inflight.
@@ -635,8 +645,6 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
         int extSize = unpacker.tryUnpackNil() ? 0 : unpacker.unpackInt();
         int expectedSchemaVersion = -1;
-        long[] sqlUpdateCounters = null;
-        UUID txId = null;
 
         for (int i = 0; i < extSize; i++) {
             String key = unpacker.unpackString();
@@ -644,24 +652,18 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             if (key.equals(ErrorExtensions.EXPECTED_SCHEMA_VERSION)) {
                 expectedSchemaVersion = unpacker.unpackInt();
             } else if (key.equals(ErrorExtensions.SQL_UPDATE_COUNTERS)) {
-                sqlUpdateCounters = unpacker.unpackLongArray();
+                return new SqlBatchException(traceId, code, 
unpacker.unpackLongArray(),
+                        errMsg != null ? errMsg : "SQL batch execution error", 
causeWithStackTrace);
             } else if (key.equals(ErrorExtensions.DELAYED_ACK)) {
-                txId = unpacker.unpackUuid();
+                return new ClientDelayedAckException(traceId, code, errMsg, 
unpacker.unpackUuid(), causeWithStackTrace);
+            } else if (key.equals(ErrorExtensions.TX_KILL)) {
+                return new ClientTransactionKilledException(traceId, code, 
errMsg, unpacker.unpackUuid(), causeWithStackTrace);
             } else {
                 // Unknown extension - ignore.
                 unpacker.skipValues(1);
             }
         }
 
-        if (txId != null) {
-            return new ClientDelayedAckException(traceId, code, errMsg, txId, 
causeWithStackTrace);
-        }
-
-        if (sqlUpdateCounters != null) {
-            errMsg = errMsg != null ? errMsg : "SQL batch execution error";
-            return new SqlBatchException(traceId, code, sqlUpdateCounters, 
errMsg, causeWithStackTrace);
-        }
-
         if (code == Table.SCHEMA_VERSION_MISMATCH_ERR) {
             if (expectedSchemaVersion == -1) {
                 return new IgniteException(
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 2360f0f4f95..92c368a4eea 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -17,12 +17,16 @@
 
 package org.apache.ignite.internal.client.sql;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.function.Function.identity;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 
 import com.github.benmanes.caffeine.cache.Cache;
@@ -36,6 +40,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.client.ClientUtils;
 import org.apache.ignite.internal.client.PartitionMapping;
@@ -147,7 +152,7 @@ public class ClientSql implements IgniteSql {
         try {
             return new SyncResultSetAdapter<>(executeAsync(transaction, 
cancellationToken, query, arguments).join());
         } catch (CompletionException e) {
-            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+            throw sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
     }
 
@@ -164,7 +169,7 @@ public class ClientSql implements IgniteSql {
         try {
             return new SyncResultSetAdapter<>(executeAsync(transaction, 
cancellationToken, statement, arguments).join());
         } catch (CompletionException e) {
-            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+            throw sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
     }
 
@@ -182,7 +187,7 @@ public class ClientSql implements IgniteSql {
         try {
             return new SyncResultSetAdapter<>(executeAsync(transaction, 
mapper, cancellationToken, query, arguments).join());
         } catch (CompletionException e) {
-            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+            throw sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
     }
 
@@ -200,7 +205,7 @@ public class ClientSql implements IgniteSql {
         try {
             return new SyncResultSetAdapter<>(executeAsync(transaction, 
mapper, cancellationToken, statement, arguments).join());
         } catch (CompletionException e) {
-            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+            throw sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
     }
 
@@ -226,7 +231,7 @@ public class ClientSql implements IgniteSql {
         try {
             return executeBatchAsync(transaction, cancellationToken, 
dmlStatement, batch).join();
         } catch (CompletionException e) {
-            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+            throw sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
     }
 
@@ -244,7 +249,7 @@ public class ClientSql implements IgniteSql {
         try {
             executeScriptAsync(cancellationToken, query, arguments).join();
         } catch (CompletionException e) {
-            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+            throw sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
     }
 
@@ -366,7 +371,38 @@ public class ClientSql implements IgniteSql {
                 () -> DirectTxUtils.resolveChannel(ctx, ch, 
shouldTrackOperation, tx, mapping),
                 null,
                 false
-        )).exceptionally(ClientSql::handleException);
+        ).handle((BiFunction<AsyncResultSet<T>, Throwable, 
CompletableFuture<AsyncResultSet<T>>>) (r, err) -> {
+            if (err != null) {
+                if (tx == null || !shouldTrackOperation) {
+                    return failedFuture(err);
+                }
+
+                if (ctx.enlistmentToken != null) {
+                    // In case of direct mapping error need to rollback the tx 
on coordinator.
+                    return tx.rollbackAsync().handle((ignored, err0) -> {
+                        if (err0 != null) {
+                            err.addSuppressed(err0);
+                        }
+
+                        sneakyThrow(err);
+                        return null;
+                    });
+                } else {
+                    // In case of unrecoverable error the tx is already rolled 
back on coordinator.
+                    // Need to additionally cleanup directly mapped parts.
+                    return tx.discardDirectMappings(false).handle((ignored, 
err0) -> {
+                        if (err0 != null) {
+                            err.addSuppressed(err0);
+                        }
+
+                        sneakyThrow(err);
+                        return null;
+                    });
+                }
+            }
+
+            return completedFuture(r);
+        })).thenCompose(identity()).exceptionally(ClientSql::handleException);
     }
 
     private static @Nullable PartitionMapping resolveMapping(
@@ -404,7 +440,7 @@ public class ClientSql implements IgniteSql {
             boolean sqlDirectMappingSupported = 
r.clientChannel().protocolContext().isFeatureSupported(SQL_DIRECT_TX_MAPPING);
             boolean sqlMultistatementsSupported = 
r.clientChannel().protocolContext().allFeaturesSupported(SQL_MULTISTATEMENT_SUPPORT);
 
-            DirectTxUtils.readTx(r, ctx, tx, ch.observableTimestamp());
+            DirectTxUtils.readTx(r, ch, ctx, tx, ch.observableTimestamp());
             ClientAsyncResultSet<T> rs = new ClientAsyncResultSet<>(
                     r.clientChannel(), marshallers, r.in(), mapper, 
tryUnpackPaMeta, sqlDirectMappingSupported, sqlMultistatementsSupported
             );
@@ -556,7 +592,7 @@ public class ClientSql implements IgniteSql {
                         .thenCompose(tx -> 
tx.channel().serviceAsync(ClientOp.SQL_EXEC_BATCH, payloadWriter, 
payloadReader))
                         .exceptionally(ClientSql::handleException);
             } catch (TransactionException e) {
-                return CompletableFuture.failedFuture(new 
SqlException(e.traceId(), e.code(), e.getMessage(), e));
+                return failedFuture(new SqlException(e.traceId(), e.code(), 
e.getMessage(), e));
             }
         }
 
@@ -647,7 +683,7 @@ public class ClientSql implements IgniteSql {
             throw new SqlException(te.traceId(), te.code(), te.getMessage(), 
te);
         }
 
-        throw ExceptionUtils.sneakyThrow(ex);
+        throw sneakyThrow(ex);
     }
 
     private static class PaCacheKey {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 04167245bc6..bb499258db0 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
 import static 
org.apache.ignite.internal.client.table.ClientTableMapUtils.mapAndRetry;
 import static 
org.apache.ignite.internal.client.table.ClientTableMapUtils.reduceWithKeepOrder;
+import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
 import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
@@ -504,21 +505,21 @@ public class ClientTable implements Table {
                                 .thenCompose(t -> loadSchemaAndReadData(t, 
reader))
                                 .handle((ret, ex) -> {
                                     if (ex != null) {
-                                        // Retry schema errors, if any.
                                         Throwable cause = ex;
 
                                         if (ctx.firstReqFut != null) {
                                             // Create failed transaction.
-                                            ClientTransaction failed = new 
ClientTransaction(ctx.channel, id, ctx.readOnly, null, ctx.pm,
-                                                    null, 
ch.observableTimestamp(), 0);
+                                            ClientTransaction failed = new 
ClientTransaction(ctx.channel, ch, id, ctx.readOnly, null,
+                                                    ctx.pm, null, 
ch.observableTimestamp(), 0);
                                             failed.fail();
                                             ctx.firstReqFut.complete(failed);
+                                            // Txn was not started, rollback 
is not required.
                                             
fut.completeExceptionally(unwrapCause(ex));
                                             return null;
                                         }
 
-                                        // Don't attempt retrying in case of 
direct mapping. This may be improved in the future.
                                         if (ctx.enlistmentToken == null) {
+                                            // Retry schema errors, if any, in 
proxy mode.
                                             while (cause != null) {
                                                 if (cause instanceof 
ClientSchemaVersionMismatchException) {
                                                     // Retry with specific 
schema version.
@@ -527,13 +528,7 @@ public class ClientTable implements Table {
                                                     
doSchemaOutInOpAsync(opCode, writer, reader, defaultValue, 
responseSchemaRequired,
                                                             provider,
                                                             
retryPolicyOverride, expectedVersion, expectNotifications, tx)
-                                                            
.whenComplete((res0, err0) -> {
-                                                                if (err0 != 
null) {
-                                                                    
fut.completeExceptionally(err0);
-                                                                } else {
-                                                                    
fut.complete(res0);
-                                                                }
-                                                            });
+                                                            
.whenComplete(copyStateTo(fut));
 
                                                     return null;
                                                 } else if 
(schemaVersionOverride == null && cause instanceof UnmappedColumnsException) {
@@ -544,13 +539,7 @@ public class ClientTable implements Table {
                                                     
doSchemaOutInOpAsync(opCode, writer, reader, defaultValue, 
responseSchemaRequired,
                                                             provider,
                                                             
retryPolicyOverride, UNKNOWN_SCHEMA_VERSION, expectNotifications, tx)
-                                                            
.whenComplete((res0, err0) -> {
-                                                                if (err0 != 
null) {
-                                                                    
fut.completeExceptionally(err0);
-                                                                } else {
-                                                                    
fut.complete(res0);
-                                                                }
-                                                            });
+                                                            
.whenComplete(copyStateTo(fut));
 
                                                     return null;
                                                 }
@@ -558,9 +547,23 @@ public class ClientTable implements Table {
                                                 cause = cause.getCause();
                                             }
 
-                                            fut.completeExceptionally(ex);
+                                            if (tx0 == null) {
+                                                fut.completeExceptionally(ex);
+                                            } else {
+                                                // In case of unrecoverable 
error the tx is already rolled back on coordinator.
+                                                // We need to additionally 
cleanup directly mapped parts.
+                                                
tx0.discardDirectMappings(false).handle((ignored, err0) -> {
+                                                    if (err0 != null) {
+                                                        ex.addSuppressed(err0);
+                                                    }
+
+                                                    
fut.completeExceptionally(ex);
+
+                                                    return (T) null;
+                                                });
+                                            }
                                         } else {
-                                            // In case of direct mapping 
failure for any reason try to roll back the transaction.
+                                            // In case of direct mapping error 
we need to rollback the tx on coordinator.
                                             
tx0.rollbackAsync().handle((ignored, err0) -> {
                                                 if (err0 != null) {
                                                     ex.addSuppressed(err0);
@@ -597,7 +600,7 @@ public class ClientTable implements Table {
             @Nullable ClientTransaction tx0
     ) {
         ClientMessageUnpacker in1 = in.in();
-        DirectTxUtils.readTx(in, ctx, tx0, ch.observableTimestamp());
+        DirectTxUtils.readTx(in, ch, ctx, tx0, ch.observableTimestamp());
 
         int schemaVer = in1.unpackInt();
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
index 46ddef2f9bc..1b2d70f3a21 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
@@ -28,6 +28,8 @@ import org.apache.ignite.internal.client.ReliableChannel;
 import org.apache.ignite.internal.client.proto.tx.ClientInternalTxOptions;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.ViewUtils;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionException;
 import org.apache.ignite.tx.TransactionOptions;
@@ -68,14 +70,7 @@ public class ClientLazyTransaction implements Transaction {
 
     @Override
     public void commit() throws TransactionException {
-        var tx0 = tx;
-
-        if (tx0 == null) {
-            // No operations were performed, nothing to commit.
-            return;
-        }
-
-        tx0.join().commit();
+        ViewUtils.sync(commitAsync());
     }
 
     @Override
@@ -92,14 +87,7 @@ public class ClientLazyTransaction implements Transaction {
 
     @Override
     public void rollback() throws TransactionException {
-        var tx0 = tx;
-
-        if (tx0 == null) {
-            // No operations were performed, nothing to rollback.
-            return;
-        }
-
-        tx0.join().rollback();
+        ViewUtils.sync(rollbackAsync());
     }
 
     @Override
@@ -228,4 +216,9 @@ public class ClientLazyTransaction implements Transaction {
     @Nullable EnumSet<ClientInternalTxOptions> options() {
         return txOptions;
     }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index d1c31601921..414ef106c4e 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -18,20 +18,25 @@
 package org.apache.ignite.internal.client.tx;
 
 import static java.util.function.Function.identity;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ViewUtils.sync;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_KILLED_ERR;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.PartitionMapping;
@@ -43,6 +48,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionException;
@@ -66,6 +72,9 @@ public class ClientTransaction implements Transaction {
     /** Rolled back state. */
     private static final int STATE_ROLLED_BACK = 2;
 
+    /** Kill state. */
+    private static final int STATE_KILLED = 3;
+
     /** Channel that the transaction belongs to. */
     @IgniteToStringExclude
     private final ClientChannel ch;
@@ -75,7 +84,7 @@ public class ClientTransaction implements Transaction {
 
     /** The future used on repeated commit/rollback. */
     @IgniteToStringExclude
-    private final AtomicReference<CompletableFuture<Void>> finishFut = new 
AtomicReference<>();
+    private volatile CompletableFuture<Void> finishFut;
 
     /** State. */
     private final AtomicInteger state = new AtomicInteger(STATE_OPEN);
@@ -105,10 +114,13 @@ public class ClientTransaction implements Transaction {
     @IgniteToStringExclude
     private final ReentrantReadWriteLock enlistPartitionLock = new 
ReentrantReadWriteLock();
 
+    private final ReliableChannel reliableChannel;
+
     /**
      * Constructor.
      *
-     * @param ch Channel that the transaction belongs to.
+     * @param ch Channel that the transaction belongs to (coordinator 
connection).
+     * @param reliableChannel Channels repository.
      * @param id Transaction id.
      * @param isReadOnly Read-only flag.
      * @param txId Transaction id.
@@ -119,6 +131,7 @@ public class ClientTransaction implements Transaction {
      */
     public ClientTransaction(
             ClientChannel ch,
+            ReliableChannel reliableChannel,
             long id,
             boolean isReadOnly,
             UUID txId,
@@ -128,6 +141,7 @@ public class ClientTransaction implements Transaction {
             long timeout
     ) {
         this.ch = ch;
+        this.reliableChannel = reliableChannel;
         this.id = id;
         this.isReadOnly = isReadOnly;
         this.txId = txId;
@@ -206,14 +220,92 @@ public class ClientTransaction implements Transaction {
         sync(commitAsync());
     }
 
+    /**
+     * Discards the directly mapped transaction fragments in case of 
coordinator side transaction invalidation
+     * (either kill or implicit rollback due to mapping failure, see 
postEnlist).
+     *
+     * @param killed Killed flag.
+     *
+     * @return The future.
+     */
+    public CompletableFuture<Void> discardDirectMappings(boolean killed) {
+        enlistPartitionLock.writeLock().lock();
+
+        try {
+            if (finishFut != null) {
+                return finishFut;
+            } else {
+                finishFut = new CompletableFuture<>();
+            }
+        } finally {
+            enlistPartitionLock.writeLock().unlock();
+        }
+
+        return sendDiscardRequests().handle((r, e) -> {
+            setState(killed ? STATE_KILLED : STATE_ROLLED_BACK);
+            ch.inflights().erase(txId());
+            this.finishFut.complete(null);
+            return null;
+        });
+    }
+
+    private CompletableFuture<Void> sendDiscardRequests() {
+        assert finishFut != null;
+
+        if 
(!ch.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING_SEND_DISCARD)) {
+            return nullCompletedFuture();
+        }
+
+        Map<String, List<TablePartitionId>> enlistments = new HashMap<>();
+
+        for (Entry<TablePartitionId, CompletableFuture<IgniteBiTuple<String, 
Long>>> entry : enlisted.entrySet()) {
+            IgniteBiTuple<String, Long> info = entry.getValue().getNow(null);
+
+            if (info == null) {
+                continue; // Ignore incomplete enlistments.
+            }
+
+            enlistments.computeIfAbsent(info.get1(), k -> new 
ArrayList<>()).add(entry.getKey());
+        }
+
+        List<CompletableFuture<Void>> futures = new 
ArrayList<>(enlistments.size());
+
+        for (Entry<String, List<TablePartitionId>> entry : 
enlistments.entrySet()) {
+            ClientChannel ch = reliableChannel.getNodeChannel(entry.getKey());
+
+            if (ch == null) {
+                // Connection is lost, the transaction will be cleaned up by 
other means.
+                // TODO https://issues.apache.org/jira/browse/IGNITE-27651
+                continue;
+            }
+
+            CompletableFuture<Void> discardFut = 
ch.serviceAsync(ClientOp.TX_DISCARD, w -> {
+                int cnt = entry.getValue().size();
+                w.out().packUuid(txId);
+                w.out().packInt(cnt);
+
+                for (int i = 0; i < cnt; i++) {
+                    w.out().packInt(entry.getValue().get(i).tableId());
+                    w.out().packInt(entry.getValue().get(i).partitionId());
+                }
+            }, null);
+
+            futures.add(discardFut);
+        }
+
+        return allOf(futures);
+    }
+
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> commitAsync() {
         enlistPartitionLock.writeLock().lock();
 
         try {
-            if (!finishFut.compareAndSet(null, new CompletableFuture<>())) {
-                return finishFut.get();
+            if (finishFut != null) {
+                return finishFut;
+            } else {
+                finishFut = new CompletableFuture<>();
             }
         } finally {
             enlistPartitionLock.writeLock().unlock();
@@ -245,9 +337,23 @@ public class ClientTransaction implements Transaction {
         }).thenCompose(identity());
 
         mainFinishFut.handle((res, e) -> {
+            if (e != null) {
+                // Failed to commit for some reason, need to discard direct 
mappings.
+                Throwable cause = ExceptionUtils.unwrapCause(e);
+
+                sendDiscardRequests().handle((r, e0) -> {
+                    setState(cause instanceof ClientTransactionKilledException 
? STATE_KILLED : STATE_ROLLED_BACK);
+                    ch.inflights().erase(txId());
+                    this.finishFut.complete(null);
+                    return null;
+                });
+
+                return null;
+            }
+
             setState(STATE_COMMITTED);
             ch.inflights().erase(txId());
-            this.finishFut.get().complete(null);
+            this.finishFut.complete(null);
             return null;
         });
 
@@ -266,8 +372,10 @@ public class ClientTransaction implements Transaction {
         enlistPartitionLock.writeLock().lock();
 
         try {
-            if (!finishFut.compareAndSet(null, new CompletableFuture<>())) {
-                return finishFut.get();
+            if (finishFut != null) {
+                return finishFut;
+            } else {
+                finishFut = new CompletableFuture<>();
             }
         } finally {
             enlistPartitionLock.writeLock().unlock();
@@ -285,7 +393,7 @@ public class ClientTransaction implements Transaction {
         mainFinishFut.handle((res, e) -> {
             setState(STATE_ROLLED_BACK);
             ch.inflights().erase(txId());
-            this.finishFut.get().complete(null);
+            this.finishFut.complete(null);
             return null;
         });
 
@@ -346,9 +454,22 @@ public class ClientTransaction implements Transaction {
             return clientTx;
         }
 
-        throw new TransactionException(
-                TX_ALREADY_FINISHED_ERR,
-                format("Transaction is already finished [tx={}].", clientTx));
+        throw exceptionForState(state, clientTx);
+    }
+
+    private static TransactionException exceptionForState(int state, 
ClientTransaction clientTx) {
+        assert state > STATE_OPEN;
+
+        if (state == STATE_KILLED) {
+            return new TransactionException(
+                    TX_KILLED_ERR,
+                    format("Transaction is killed [tx={}].", clientTx));
+        } else {
+            return new TransactionException(
+                    TX_ALREADY_FINISHED_ERR,
+                    format("Transaction is already finished [tx={}, 
committed={}].", clientTx,
+                            state == STATE_COMMITTED ? "true" : "false"));
+        }
     }
 
     static IgniteException unsupportedTxTypeException(Transaction tx) {
@@ -362,8 +483,8 @@ public class ClientTransaction implements Transaction {
     }
 
     private void checkEnlistPossible() {
-        if (finishFut.get() != null) {
-            throw new TransactionException(TX_ALREADY_FINISHED_ERR, 
format("Transaction is already finished [tx={}].", this));
+        if (finishFut != null) {
+            throw exceptionForState(state.get(), this);
         }
     }
 
@@ -379,39 +500,36 @@ public class ClientTransaction implements Transaction {
      */
     public CompletableFuture<IgniteBiTuple<String, Long>> 
enlistFuture(ReliableChannel ch, ClientChannel opChannel, PartitionMapping pm,
             boolean trackOperation) {
-        if (!enlistPartitionLock.readLock().tryLock()) {
-            throw new TransactionException(TX_ALREADY_FINISHED_ERR, 
format("Transaction is already finished [tx={}].", this));
-        }
+        enlistPartitionLock.readLock().lock();
 
-        checkEnlistPossible();
-
-        boolean[] first = {false};
-
-        TablePartitionId tablePartitionId = new TablePartitionId(pm.tableId(), 
pm.partition());
+        try {
+            checkEnlistPossible();
 
-        CompletableFuture<IgniteBiTuple<String, Long>> fut = 
enlisted.compute(tablePartitionId, (k, v) -> {
-            if (v == null) {
-                first[0] = true;
-                return new CompletableFuture<>();
-            } else {
-                return v;
-            }
-        });
+            boolean[] first = {false};
 
-        enlistPartitionLock.readLock().unlock();
+            TablePartitionId tablePartitionId = new 
TablePartitionId(pm.tableId(), pm.partition());
 
-        // Re-check after unlock.
-        checkEnlistPossible();
+            CompletableFuture<IgniteBiTuple<String, Long>> fut = 
enlisted.compute(tablePartitionId, (k, v) -> {
+                if (v == null) {
+                    first[0] = true;
+                    return new CompletableFuture<>();
+                } else {
+                    return v;
+                }
+            });
 
-        if (trackOperation) {
-            ch.inflights().addInflight(txId);
-        }
+            if (trackOperation) {
+                ch.inflights().addInflight(this);
+            }
 
-        if (first[0]) {
-            // For the first request return completed future.
-            return CompletableFuture.completedFuture(new IgniteBiTuple<>(null, 
null));
-        } else {
-            return fut;
+            if (first[0]) {
+                // For the first request return completed future.
+                return CompletableFuture.completedFuture(new 
IgniteBiTuple<>(null, null));
+            } else {
+                return fut;
+            }
+        } finally {
+            enlistPartitionLock.readLock().unlock();
         }
     }
 
@@ -464,11 +582,20 @@ public class ClientTransaction implements Transaction {
     /** Fail the transaction. */
     public void fail() {
         state.set(STATE_ROLLED_BACK);
-        finishFut.set(nullCompletedFuture());
+        finishFut = nullCompletedFuture();
     }
 
     @Override
     public String toString() {
         return S.toString(this);
     }
+
+    /**
+     * Returns a killed state.
+     *
+     * @return The value.
+     */
+    public boolean killed() {
+        return state.get() == STATE_KILLED;
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
similarity index 55%
copy from 
modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
copy to 
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
index ef9e06513b5..bbbe3eee7fe 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client;
+package org.apache.ignite.internal.client.tx;
 
 import java.util.UUID;
-import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Holds the transaction id and the cause for delayed replication ack failure.
+ * Reports a killed transaction.
  */
-public class ClientDelayedAckException extends IgniteInternalException {
+public class ClientTransactionKilledException extends TransactionException {
     /** Serial version uid. */
     private static final long serialVersionUID = 0L;
 
@@ -36,21 +36,38 @@ public class ClientDelayedAckException extends 
IgniteInternalException {
      *
      * @param traceId Trace ID.
      * @param code Error code.
-     *
      * @param message String message.
      * @param txId Related transaction id.
-     * @param cause Cause.
+     * @param cause The cause.
      */
-    ClientDelayedAckException(UUID traceId, int code, @Nullable String 
message, UUID txId, @Nullable Throwable cause) {
+    public ClientTransactionKilledException(UUID traceId, int code, @Nullable 
String message, UUID txId, @Nullable Throwable cause) {
         super(traceId, code, message, cause);
 
         this.txId = txId;
     }
 
     /**
-     * Gets expected schema version.
+     * Constructor (for copying purposes).
+     *
+     * @param traceId Trace ID.
+     * @param code Error code.
+     * @param message String message.
+     * @param cause The cause.
+     */
+    public ClientTransactionKilledException(UUID traceId, int code, @Nullable 
String message, @Nullable Throwable cause) {
+        super(traceId, code, message, cause);
+
+        if (cause instanceof ClientTransactionKilledException) {
+            this.txId = ((ClientTransactionKilledException) cause).txId;
+        } else {
+            throw new IllegalArgumentException("Copy constructor should only 
be called with the source exception");
+        }
+    }
+
+    /**
+     * Returns a related transaction id.
      *
-     * @return Expected schema version.
+     * @return The id.
      */
     public UUID txId() {
         return txId;
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
index 71557894d5c..cf445a80cb9 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
@@ -90,7 +90,7 @@ public class ClientTransactions implements IgniteTransactions 
{
                     w.out().packLong(timeout);
                     w.out().packLong(observableTimestamp);
                 },
-                r -> readTx(r, readOnly, timeout),
+                r -> readTx(r, ch, readOnly, timeout),
                 channelResolver,
                 null,
                 false);
@@ -98,6 +98,7 @@ public class ClientTransactions implements IgniteTransactions 
{
 
     private static ClientTransaction readTx(
             PayloadInputChannel r,
+            ReliableChannel ch,
             boolean isReadOnly,
             long timeout
     ) {
@@ -105,6 +106,6 @@ public class ClientTransactions implements 
IgniteTransactions {
 
         long id = in.unpackLong();
 
-        return new ClientTransaction(r.clientChannel(), id, isReadOnly, EMPTY, 
null, EMPTY, null, timeout);
+        return new ClientTransaction(r.clientChannel(), ch, id, isReadOnly, 
EMPTY, null, EMPTY, null, timeout);
     }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
index 51e4a53941c..8e0ea9085a7 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
@@ -92,8 +92,8 @@ public class DirectTxUtils {
             IgniteBiTuple<CompletableFuture<ClientTransaction>, Boolean> tuple 
= ClientLazyTransaction.ensureStarted(tx, ch,
                     piggybackSupported.test(ch0) ? null : () -> 
completedFuture(ch0));
 
-            // If this is the first direct request in transaction, it will 
also piggyback a transaction start.
             if (tuple.get2()) {
+                // If this is the first direct request in transaction, it will 
also piggyback a transaction start.
                 ctx.pm = pm;
                 ctx.readOnly = tx.isReadOnly();
                 ctx.channel = ch0;
@@ -186,12 +186,14 @@ public class DirectTxUtils {
      * ensuring the transaction state is correctly initialized or synchronized 
with the server.
      *
      * @param payloadChannel The {@link PayloadInputChannel} containing the 
server's response data.
+     * @param ch Channels repository.
      * @param ctx The {@link WriteContext} holding the transaction request 
state and response future.
      * @param tx The current {@link ClientTransaction}, or {@code null} if 
piggybacking a new transaction.
      * @param observableTimestamp A tracker for observable timestamps used for 
transaction visibility and causality.
      */
     public static void readTx(
             PayloadInputChannel payloadChannel,
+            ReliableChannel ch,
             WriteContext ctx,
             @Nullable ClientTransaction tx,
             HybridTimestampTracker observableTimestamp
@@ -206,7 +208,7 @@ public class DirectTxUtils {
             long timeout = in.unpackLong();
 
             ClientTransaction startedTx =
-                    new ClientTransaction(payloadChannel.clientChannel(), id, 
ctx.readOnly, txId, ctx.pm, coordId, observableTimestamp,
+                    new ClientTransaction(payloadChannel.clientChannel(), ch, 
id, ctx.readOnly, txId, ctx.pm, coordId, observableTimestamp,
                             timeout);
 
             ctx.firstReqFut.complete(startedTx);
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
index 24f8a114d34..41422194096 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
@@ -263,14 +263,14 @@ public class ClientMetricsTest extends 
BaseIgniteAbstractTest {
         server = AbstractClientTest.startServer(1000, new FakeIgnite());
         client = clientBuilder().build();
 
-        assertEquals(17, metrics().bytesSent());
+        assertEquals(18, metrics().bytesSent());
 
         long handshakeReceived = metrics().bytesReceived();
         assertThat(handshakeReceived, greaterThanOrEqualTo(77L));
 
         client.tables().tables();
 
-        assertEquals(23, metrics().bytesSent());
+        assertEquals(24, metrics().bytesSent());
         assertEquals(handshakeReceived + 21, metrics().bytesReceived());
     }
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientTransactionInflightTest.java
 
b/modules/client/src/test/java/org/apache/ignite/client/ClientTransactionInflightTest.java
index 2590cc1911e..3dc17f6c1f7 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ClientTransactionInflightTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ClientTransactionInflightTest.java
@@ -23,24 +23,35 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.client.ClientTransactionInflights;
+import org.apache.ignite.internal.client.tx.ClientTransaction;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 /**
  * Tests inflight tracker.
  */
-public class ClientTransactionInflightTest {
+public class ClientTransactionInflightTest extends BaseIgniteAbstractTest {
     private final UUID txId = UUID.randomUUID();
 
     private final ClientTransactionInflights inflights = new 
ClientTransactionInflights();
 
+    private final ClientTransaction transaction;
+
+    {
+        transaction = mock(ClientTransaction.class);
+        Mockito.when(transaction.txId()).thenReturn(txId);
+    }
+
     @Test
     public void testState1() {
-        inflights.addInflight(txId);
+        inflights.addInflight(transaction);
 
         assertEquals(1, inflights.map().get(txId).inflights);
     }
@@ -52,7 +63,7 @@ public class ClientTransactionInflightTest {
 
     @Test
     public void testState3() {
-        inflights.addInflight(txId);
+        inflights.addInflight(transaction);
         inflights.removeInflight(txId, null);
 
         CompletableFuture<Void> fut = inflights.finishFuture(txId);
@@ -61,7 +72,7 @@ public class ClientTransactionInflightTest {
 
     @Test
     public void testState4() {
-        inflights.addInflight(txId);
+        inflights.addInflight(transaction);
 
         CompletableFuture<Void> fut = inflights.finishFuture(txId);
         assertFalse(fut.isDone());
@@ -73,7 +84,7 @@ public class ClientTransactionInflightTest {
 
     @Test
     public void testState5() {
-        inflights.addInflight(txId);
+        inflights.addInflight(transaction);
 
         CompletableFuture<Void> fut = inflights.finishFuture(txId);
         assertFalse(fut.isDone());
@@ -85,7 +96,7 @@ public class ClientTransactionInflightTest {
 
     @Test
     public void testState6() {
-        inflights.addInflight(txId);
+        inflights.addInflight(transaction);
         inflights.removeInflight(txId, new TestException());
 
         CompletableFuture<Void> fut = inflights.finishFuture(txId);
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index be5ca44560b..8b77025a8ee 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -235,7 +235,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest 
{
             }
         }
 
-        long expectedNullCount = 24;
+        long expectedNullCount = 25;
 
         String msg = nullOpFields.size()
                 + " operation codes do not have public equivalent. When adding 
new codes, update ClientOperationType too. Missing ops: "
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 4955eb80d04..938db2fd479 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -21,6 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -253,7 +254,7 @@ public class FakeTxManager implements TxManager {
     @Override
     public CompletableFuture<Void> cleanup(
             ZonePartitionId commitPartitionId,
-            Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions,
+            Map<ZonePartitionId, ? extends PartitionEnlistment> 
enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
@@ -287,6 +288,11 @@ public class FakeTxManager implements TxManager {
         return nullCompletedFuture();
     }
 
+    @Override
+    public CompletableFuture<Void> 
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId) {
+        return nullCompletedFuture();
+    }
+
     @Override
     public int lockRetryCount() {
         return 0;
diff --git 
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
 
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
index e1b50698efd..5d750fa5fa0 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
@@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.params.provider.Arguments.argumentSet;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
@@ -36,12 +37,17 @@ import static org.mockito.Mockito.when;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.tx.ClientTransaction;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
 import org.apache.ignite.tx.TransactionException;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mockito;
 
 /**
@@ -63,7 +69,7 @@ public class RepeatedFinishClientTransactionTest extends 
BaseIgniteAbstractTest
             }
         };
 
-        ClientTransaction tx = new ClientTransaction(clientChannel, 1, false, 
randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
+        ClientTransaction tx = new ClientTransaction(clientChannel, null, 1, 
false, randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
 
         CompletableFuture<Object> fut = new CompletableFuture<>();
 
@@ -109,7 +115,7 @@ public class RepeatedFinishClientTransactionTest extends 
BaseIgniteAbstractTest
             }
         };
 
-        ClientTransaction tx = new ClientTransaction(clientChannel, 1, false, 
randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
+        ClientTransaction tx = new ClientTransaction(clientChannel, null, 1, 
false, randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
 
         CompletableFuture<Object> fut = new CompletableFuture<>();
 
@@ -151,7 +157,7 @@ public class RepeatedFinishClientTransactionTest extends 
BaseIgniteAbstractTest
         when(clientChannel.protocolContext()).thenReturn(ctx);
         when(clientChannel.serviceAsync(anyInt(), any(), 
any())).thenReturn(failedFuture(new Exception("Expected exception.")));
 
-        ClientTransaction tx = new ClientTransaction(clientChannel, 1, false, 
randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
+        ClientTransaction tx = new ClientTransaction(clientChannel, null, 1, 
false, randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
 
         CompletableFuture<Object> fut = new CompletableFuture<>();
 
@@ -182,7 +188,7 @@ public class RepeatedFinishClientTransactionTest extends 
BaseIgniteAbstractTest
         when(clientChannel.protocolContext()).thenReturn(ctx);
         when(clientChannel.serviceAsync(anyInt(), any(), 
any())).thenReturn(failedFuture(new Exception("Expected exception.")));
 
-        ClientTransaction tx = new ClientTransaction(clientChannel, 1, false, 
randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
+        ClientTransaction tx = new ClientTransaction(clientChannel, null, 1, 
false, randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
 
         CompletableFuture<Object> fut = new CompletableFuture<>();
 
@@ -217,7 +223,7 @@ public class RepeatedFinishClientTransactionTest extends 
BaseIgniteAbstractTest
 
         PartitionMapping pm = new PartitionMapping(1, "test", 1);
 
-        ClientTransaction tx = new ClientTransaction(clientChannel, 1, false, 
randomUUID(), pm, randomUUID(), EMPTY_TS_PROVIDER, 0);
+        ClientTransaction tx = new ClientTransaction(clientChannel, ch, 1, 
false, randomUUID(), pm, randomUUID(), EMPTY_TS_PROVIDER, 0);
 
         tx.commit();
 
@@ -233,8 +239,16 @@ public class RepeatedFinishClientTransactionTest extends 
BaseIgniteAbstractTest
         }
     }
 
-    @Test
-    public void testEnlistFailAfterRollback() {
+    private static Stream<Arguments> rollbackClosureFactory() {
+        return Stream.of(
+                argumentSet("rollback", (Consumer<ClientTransaction>) 
ClientTransaction::rollback),
+                argumentSet("discard", (Consumer<ClientTransaction>) 
clientTransaction -> clientTransaction.discardDirectMappings(false))
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("rollbackClosureFactory")
+    public void testEnlistFailAfterRollback(Consumer<ClientTransaction> 
rollbackClo) {
         ReliableChannel ch = mock(ReliableChannel.class, 
Mockito.RETURNS_DEEP_STUBS);
 
         TestClientChannel clientChannel = mock(TestClientChannel.class, 
Mockito.RETURNS_DEEP_STUBS);
@@ -248,9 +262,9 @@ public class RepeatedFinishClientTransactionTest extends 
BaseIgniteAbstractTest
 
         PartitionMapping pm = new PartitionMapping(1, "test", 1);
 
-        ClientTransaction tx = new ClientTransaction(clientChannel, 1, false, 
randomUUID(), pm, randomUUID(), EMPTY_TS_PROVIDER, 0);
+        ClientTransaction tx = new ClientTransaction(clientChannel, ch, 1, 
false, randomUUID(), pm, randomUUID(), EMPTY_TS_PROVIDER, 0);
 
-        tx.rollback();
+        rollbackClo.accept(tx);
 
         WriteContext wc = new WriteContext(emptyTracker(), 
ClientOp.TUPLE_UPSERT);
         wc.pm = pm;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index 2e73be1e17a..04d7af04902 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -96,6 +96,28 @@ public final class CollectionUtils {
         return iter == null || !iter.hasNext();
     }
 
+    /**
+     * Count values in iterator.
+     *
+     * @param iter The iterator.
+     *
+     * @return The count.
+     */
+    public static int count(@Nullable Iterator<?> iter) {
+        if (iter == null) {
+            return 0;
+        }
+
+        int cnt = 0;
+
+        while (iter.hasNext()) {
+            iter.next();
+            cnt++;
+        }
+
+        return cnt;
+    }
+
     /**
      * Gets first element from given list or returns {@code null} if list is 
empty.
      *
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h 
b/modules/platforms/cpp/ignite/common/error_codes.h
index 23f91076ba3..4bca391ce42 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -139,6 +139,7 @@ enum class code : underlying_t {
     TX_STALE_READ_ONLY_OPERATION = 0x7000f,
     TX_ALREADY_FINISHED_WITH_TIMEOUT = 0x70010,
     TX_DELAYED_ACK = 0x70011,
+    TX_KILLED = 0x70012,
 
     // Replicator group. Group code: 8
     REPLICA_COMMON = 0x80001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp 
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 5e6581e7378..fd177463f2f 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -210,6 +210,7 @@ sql_state error_code_to_sql_state(error::code code) {
         case error::code::TX_STALE_READ_ONLY_OPERATION:
         case error::code::TX_ALREADY_FINISHED_WITH_TIMEOUT:
         case error::code::TX_DELAYED_ACK:
+        case error::code::TX_KILLED:
             return sql_state::S25000_INVALID_TRANSACTION_STATE;
 
         // Replicator group. Group code: 8
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 80ebf868d8d..06abe213e46 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -370,6 +370,9 @@ namespace Apache.Ignite
 
             /// <summary> TxDelayedAck error. </summary>
             public const int TxDelayedAck = (GroupCode << 16) | (17 & 0xFFFF);
+
+            /// <summary> TxKilled error. </summary>
+            public const int TxKilled = (GroupCode << 16) | (18 & 0xFFFF);
         }
 
         /// <summary> Replicator errors. </summary>
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
index 47fa23e374f..d2517738151 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteServer;
 import org.apache.ignite.InitParameters;
@@ -84,6 +85,17 @@ public abstract class ItAbstractThinClientTest extends 
BaseIgniteAbstractTest {
     private IgniteClient client;
     private List<IgniteServer> nodes;
 
+    /**
+     * Enables stack traces for a node.
+     *
+     * @param nodeIdx Node index to test.
+     *
+     * @return The flag value.
+     */
+    protected boolean enableTracesPredicate(int nodeIdx) {
+        return nodeIdx == 1;
+    }
+
     /**
      * Before all.
      */
@@ -97,7 +109,7 @@ public abstract class ItAbstractThinClientTest extends 
BaseIgniteAbstractTest {
                     "ignite {\n"
                             + "  network.port: " + (3344 + i) + ",\n"
                             + "  network.nodeFinder.netClusterNodes: [ 
\"localhost:3344\" ]\n"
-                            + (i == 1 ? ("  
clientConnector.sendServerExceptionStackTraceToClient: true\n"
+                            + (enableTracesPredicate(i) ? ("  
clientConnector.sendServerExceptionStackTraceToClient: true\n"
                             + "  clientConnector.metricsEnabled: true\n") : "")
                             + "  clientConnector.port: " + (10800 + i) + ",\n"
                             + "  rest.port: " + (10300 + i) + ",\n"
@@ -211,6 +223,10 @@ public abstract class ItAbstractThinClientTest extends 
BaseIgniteAbstractTest {
         return startedNodes.get(idx);
     }
 
+    protected Ignite server(ClusterNode node) {
+        return IntStream.range(0, nodes()).mapToObj(this::server).filter(n -> 
n.name().equals(node.name())).findFirst().orElseThrow();
+    }
+
     protected ClusterNode node(int idx) {
         return sortedNodes().get(idx);
     }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index 4445fe3444a..e8fa68cacb2 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.internal.runner.app.client;
 
+import static java.lang.String.format;
 import static java.util.Collections.emptyList;
 import static java.util.Comparator.comparing;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.startsWith;
@@ -32,13 +34,16 @@ import static 
org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.argumentSet;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -50,22 +55,29 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.ClientTransactionInflights;
 import org.apache.ignite.internal.client.TcpIgniteClient;
+import org.apache.ignite.internal.client.sql.ClientSql;
+import org.apache.ignite.internal.client.sql.PartitionMappingProvider;
 import org.apache.ignite.internal.client.table.ClientTable;
 import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
 import org.apache.ignite.internal.client.tx.ClientTransaction;
+import org.apache.ignite.internal.lang.IgniteTriFunction;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.Lock;
 import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.lang.ErrorGroups;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
@@ -78,8 +90,11 @@ import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionException;
 import org.apache.ignite.tx.TransactionOptions;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mockito;
 
@@ -393,13 +408,15 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         tx.rollback();
     }
 
-    @Test
-    void testKillTransaction() {
+    @ParameterizedTest
+    @MethodSource("killTestContextFactory")
+    void testKillTransaction(KillTestContext ctx) {
         @SuppressWarnings("resource") IgniteClient client = client();
-        KeyValueView<Integer, String> kvView = kvView();
+        KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
 
         Transaction tx = client.transactions().begin();
-        kvView.put(tx, 1, "1");
+        Tuple key = key(0);
+        assertThat(ctx.put.apply(client, tx, key), willSucceedFast());
 
         try (ResultSet<SqlRow> cursor = client.sql().execute("SELECT 
TRANSACTION_ID FROM SYSTEM.TRANSACTIONS")) {
             cursor.forEachRemaining(r -> {
@@ -411,7 +428,152 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         TransactionException ex = assertThrows(TransactionException.class, 
tx::commit);
 
         assertThat(ex.getMessage(), startsWith("Transaction is killed"));
-        assertEquals("IGN-TX-13", ex.codeAsString());
+        assertEquals("IGN-TX-18", ex.codeAsString());
+
+        assertThat(kvView.removeAsync(key), willSucceedFast());
+    }
+
+    @ParameterizedTest
+    @MethodSource("killTestContextFactory")
+    void testEnlistmentAfterKillTransaction(KillTestContext ctx) {
+        @SuppressWarnings("resource") IgniteClient client = client();
+        KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+        Map<Partition, ClusterNode> map = 
table().partitionDistribution().primaryReplicasAsync().join();
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        List<Tuple> tuples = generateKeysForNode(100, 10, map, 
server0.cluster().localNode(), table());
+
+        Transaction tx = client.transactions().begin();
+        Tuple key = tuples.get(0);
+        assertThat(ctx.put.apply(client, tx, key), willSucceedFast());
+
+        try (ResultSet<SqlRow> cursor = client.sql().execute("SELECT 
TRANSACTION_ID FROM SYSTEM.TRANSACTIONS")) {
+            cursor.forEachRemaining(r -> {
+                String txId = r.stringValue("TRANSACTION_ID");
+                client.sql().executeScript("KILL TRANSACTION '" + txId + "'");
+            });
+        }
+
+        Tuple key2 = tuples.get(1);
+        assertThat(ctx.put.apply(client, tx, key2), 
willThrowWithCauseOrSuppressed(TransactionException.class, "Transaction is 
killed"));
+
+        assertThat(tx.commitAsync(), willSucceedFast());
+
+        // Validate lock possibility.
+        assertThat(kvView.removeAllAsync(null, Arrays.asList(key, key2)), 
willSucceedFast());
+    }
+
+    @ParameterizedTest
+    @MethodSource("killTestContextFactory")
+    void testKillTransactionAfterSecondEnlistment(KillTestContext ctx) {
+        @SuppressWarnings("resource") IgniteClient client = client();
+        KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+        Map<Partition, ClusterNode> map = 
table().partitionDistribution().primaryReplicasAsync().join();
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        List<Tuple> tuples = generateKeysForNode(100, 10, map, 
server0.cluster().localNode(), table());
+
+        Transaction tx = client.transactions().begin();
+        Tuple key = tuples.get(0);
+        assertThat(ctx.put.apply(client, tx, key), willSucceedFast());
+
+        Tuple key2 = tuples.get(1);
+        assertThat(ctx.put.apply(client, tx, key2), willSucceedFast());
+
+        try (ResultSet<SqlRow> cursor = client.sql().execute("SELECT 
TRANSACTION_ID FROM SYSTEM.TRANSACTIONS")) {
+            cursor.forEachRemaining(r -> {
+                String txId = r.stringValue("TRANSACTION_ID");
+                client.sql().executeScript("KILL TRANSACTION '" + txId + "'");
+            });
+        }
+
+        assertThat(tx.commitAsync(), 
willThrowWithCauseOrSuppressed(TransactionException.class, "Transaction is 
killed"));
+
+        // Validate lock possibility.
+        assertThat(kvView.removeAllAsync(null, Arrays.asList(key, key2)), 
willSucceedFast());
+    }
+
+    @ParameterizedTest
+    @MethodSource("killTestContextFactory")
+    void testDirectEnlistmentAfterKillTransaction(KillTestContext ctx) {
+        @SuppressWarnings("resource") IgniteClient client = client();
+        ClientSql sql = (ClientSql) client.sql();
+        KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+        Map<Partition, ClusterNode> map = 
table().partitionDistribution().primaryReplicasAsync().join();
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
+        List<Tuple> tuples = generateKeysForNode(100, 10, map, 
server0.cluster().localNode(), table());
+        List<Tuple> tuples2 = generateKeysForNode(100, 10, map, 
server1.cluster().localNode(), table());
+
+        // Init mappings.
+        Tuple key0 = tuples.get(0);
+        sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)", 
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+                key0.intValue(0), key0.intValue(0) + "");
+        await().atMost(2, TimeUnit.SECONDS)
+                .until(() -> 
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready));
+
+        Transaction tx = client.transactions().begin();
+        Tuple key1 = tuples.get(1);
+        assertThat(ctx.put.apply(client, tx, key1), willSucceedFast());
+
+        try (ResultSet<SqlRow> cursor = client.sql().execute("SELECT 
TRANSACTION_ID FROM SYSTEM.TRANSACTIONS")) {
+            cursor.forEachRemaining(r -> {
+                String txId = r.stringValue("TRANSACTION_ID");
+                client.sql().executeScript("KILL TRANSACTION '" + txId + "'");
+            });
+        }
+
+        // No direct enlistments exists at this point so put will succeed. 
This can be improved.
+        Tuple key2 = tuples2.get(0);
+        assertThat(ctx.put.apply(client, tx, key2), willSucceedFast());
+
+        assertThat(tx.commitAsync(), 
willThrowWithCauseOrSuppressed(TransactionException.class, "Transaction is 
killed"));
+
+        // Validate lock possibility.
+        assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key1, 
key2)), willSucceedFast());
+    }
+
+    @ParameterizedTest
+    @MethodSource("killTestContextFactory")
+    void testKillSqlTransactionAfterSecondDirectEnlistment(KillTestContext 
ctx) {
+        @SuppressWarnings("resource") IgniteClient client = client();
+        ClientSql sql = (ClientSql) client.sql();
+        KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+        Map<Partition, ClusterNode> map = 
table().partitionDistribution().primaryReplicasAsync().join();
+        IgniteImpl server0 = unwrapIgniteImpl(server(0));
+        IgniteImpl server1 = unwrapIgniteImpl(server(1));
+        List<Tuple> tuples = generateKeysForNode(100, 10, map, 
server0.cluster().localNode(), table());
+        List<Tuple> tuples2 = generateKeysForNode(100, 10, map, 
server1.cluster().localNode(), table());
+
+        // Init mappings.
+        Tuple key0 = tuples.get(0);
+        sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)", 
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+                key0.intValue(0), key0.intValue(0) + "");
+        await().atMost(2, TimeUnit.SECONDS)
+                .until(() -> 
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready));
+
+        ClientLazyTransaction tx = (ClientLazyTransaction) 
client.transactions().begin();
+        Tuple key1 = tuples.get(1);
+        assertThat(ctx.put.apply(client, tx, key1), willSucceedFast());
+
+        Tuple key2 = tuples2.get(0);
+        assertThat(ctx.put.apply(client, tx, key2), willSucceedFast());
+
+        try (ResultSet<SqlRow> cursor = client.sql().execute("SELECT 
TRANSACTION_ID FROM SYSTEM.TRANSACTIONS")) {
+            cursor.forEachRemaining(r -> {
+                String txId = r.stringValue("TRANSACTION_ID");
+                client.sql().executeScript("KILL TRANSACTION '" + txId + "'");
+            });
+        }
+
+        await().atMost(3, TimeUnit.SECONDS).until(() -> 
tx.startedTx().killed());
+
+        assertThat(tx.commitAsync(), willSucceedFast());
+
+        // Validate lock possibility.
+        assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key1, 
key2)), willSucceedFast());
     }
 
     @Test
@@ -425,7 +587,7 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         assertEquals(PARTITIONS, map.size());
 
-        int k = 111; // Avoid intersection with previous tests.
+        int k = 111111; // Avoid intersection with previous tests.
 
         Map<Tuple, Tuple> txMap = new HashMap<>();
 
@@ -835,7 +997,7 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         // Expecting each write operation to trigger add/remove events.
         int exp = 20 + partitions;
-        Mockito.verify(spyed, 
Mockito.times(exp)).addInflight(tx0.startedTx().txId());
+        Mockito.verify(spyed, Mockito.times(exp)).addInflight(tx0.startedTx());
         Mockito.verify(spyed, 
Mockito.times(exp)).removeInflight(Mockito.eq(tx0.startedTx().txId()), 
Mockito.any());
 
         // Check if all locks are released.
@@ -1183,38 +1345,154 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         assertNull(val, "Read-only transaction should not see values committed 
after its start");
     }
 
-    @Test
-    public void testRollbackDoesNotBlockOnLockConflict() {
+    @ParameterizedTest
+    @MethodSource("killTestContextFactory")
+    public void testRollbackDoesNotBlockOnLockConflict(KillTestContext ctx) {
         ClientTable table = (ClientTable) table();
+        ClientSql sql = (ClientSql) client().sql();
         KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
 
         Map<Partition, ClusterNode> map = 
table.partitionDistribution().primaryReplicasAsync().join();
-        List<Tuple> tuples0 = generateKeysForPartition(800, 10, map, 0, table);
+        Entry<Partition, ClusterNode> mapping = 
map.entrySet().iterator().next();
+        List<Tuple> tuples0 = generateKeysForPartition(100, 10, map, (int) 
mapping.getKey().id(), table);
+        Ignite server = server(mapping.getValue());
+
+        // Init SQL mappings.
+        Tuple key0 = tuples0.get(0);
+        sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)", 
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+                key0.intValue(0), key0.intValue(0) + "");
+        await().atMost(2, TimeUnit.SECONDS)
+                .until(() -> 
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready));
 
         ClientLazyTransaction olderTxProxy = (ClientLazyTransaction) 
client().transactions().begin();
         ClientLazyTransaction youngerTxProxy = (ClientLazyTransaction) 
client().transactions().begin();
 
-        Tuple key = tuples0.get(0);
-        Tuple key2 = tuples0.get(1);
-        Tuple val = val("1");
-        Tuple val2 = val("2");
+        Tuple key = tuples0.get(1);
+        Tuple key2 = tuples0.get(2);
 
-        kvView.put(olderTxProxy, key, val);
+        assertThat(ctx.put.apply(client(), olderTxProxy, key), 
willSucceedFast());
         ClientTransaction olderTx = olderTxProxy.startedTx();
 
-        kvView.put(youngerTxProxy, key2, val2);
+        assertThat(ctx.put.apply(client(), youngerTxProxy, key2), 
willSucceedFast());
         ClientTransaction youngerTx = youngerTxProxy.startedTx();
 
         assertTrue(olderTx.txId().compareTo(youngerTx.txId()) < 0);
 
         // Older is allowed to wait with wait-die.
-        CompletableFuture<Void> fut = kvView.putAsync(olderTxProxy, key2, val);
+        CompletableFuture<?> fut = ctx.put.apply(client(), olderTxProxy, key2);
         assertFalse(fut.isDone());
 
+        IgniteImpl ignite = unwrapIgniteImpl(server);
+
+        await().atMost(2, TimeUnit.SECONDS).until(() -> {
+            Iterator<Lock> locks = 
ignite.txManager().lockManager().locks(olderTx.txId());
+
+            return CollectionUtils.count(locks) == 2;
+        });
+
         assertThat(olderTxProxy.rollbackAsync(), willSucceedFast());
 
         // Operation future should be failed.
-        assertThat(fut, 
willThrowWithCauseOrSuppressed(TransactionException.class));
+        assertThat(fut, willThrowWithCauseOrSuppressed(ctx.expectedErr));
+
+        // Ensure inflights cleanup.
+        assertThat(youngerTxProxy.rollbackAsync(), willSucceedFast());
+
+        assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key, 
key2)), willSucceedFast());
+    }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-27947";)
+    public void testRollbackDoesNotBlockOnLockConflictDuringFirstRequest() 
throws InterruptedException {
+        // Note: reversed tx priority is required for this test.
+        ClientTable table = (ClientTable) table();
+        KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+        Map<Partition, ClusterNode> map = 
table.partitionDistribution().primaryReplicasAsync().join();
+        List<Tuple> tuples0 = generateKeysForPartition(100, 10, map, 0, table);
+
+        // We need a waiter for this scenario.
+        Tuple key = tuples0.get(0);
+        Tuple val = val("1");
+
+        ClientLazyTransaction tx1 = (ClientLazyTransaction) 
client().transactions().begin();
+        ClientLazyTransaction tx2 = (ClientLazyTransaction) 
client().transactions().begin();
+
+        kvView.put(tx1, key, val);
+
+        // Will wait for lock.
+        CompletableFuture<Void> fut2 = kvView.putAsync(tx2, key, val);
+        assertFalse(fut2.isDone());
+
+        Thread.sleep(500);
+
+        // Rollback should not be blocked.
+        assertThat(tx2.rollbackAsync(), willSucceedFast());
+        assertThat(tx1.rollbackAsync(), willSucceedFast());
+    }
+
+    @ParameterizedTest
+    @MethodSource("killTestContextFactory")
+    public void 
testRollbackDoesNotBlockOnLockConflictWithDirectMapping(KillTestContext ctx) {
+        ClientTable table = (ClientTable) table();
+        ClientSql sql = (ClientSql) client().sql();
+        KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+        Map<Partition, ClusterNode> map = 
table.partitionDistribution().primaryReplicasAsync().join();
+        Entry<Partition, ClusterNode> p0 = null;
+        Entry<Partition, ClusterNode> p1 = null;
+        for (Entry<Partition, ClusterNode> entry : map.entrySet()) {
+            if (p0 == null) {
+                p0 = entry;
+            } else if (!p0.getValue().equals(entry.getValue())) {
+                p1 = entry;
+                break;
+            }
+        }
+
+        // Expecting at least one partition on different node.
+        assertNotNull(p0);
+        assertNotNull(p1);
+
+        List<Tuple> tuples0 = generateKeysForPartition(100, 10, map, (int) 
p0.getKey().id(), table);
+        List<Tuple> tuples1 = generateKeysForPartition(100, 10, map, (int) 
p1.getKey().id(), table);
+
+        // Init SQL mappings.
+        Tuple key0 = tuples0.get(0);
+        sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)", 
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+                key0.intValue(0), key0.intValue(0) + "");
+        await().atMost(2, TimeUnit.SECONDS)
+                .until(() -> 
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready));
+
+        ClientLazyTransaction olderTxProxy = (ClientLazyTransaction) 
client().transactions().begin();
+        ClientLazyTransaction youngerTxProxy = (ClientLazyTransaction) 
client().transactions().begin();
+
+        Tuple key = tuples0.get(1);
+        Tuple key2 = tuples0.get(2);
+        Tuple key3 = tuples1.get(0);
+        Tuple key4 = tuples1.get(1);
+
+        assertThat(ctx.put.apply(client(), olderTxProxy, key), 
willSucceedFast());
+
+        ClientTransaction olderTx = olderTxProxy.startedTx();
+
+        assertThat(ctx.put.apply(client(), youngerTxProxy, key2), 
willSucceedFast());
+
+        ClientTransaction youngerTx = youngerTxProxy.startedTx();
+
+        assertTrue(olderTx.txId().compareTo(youngerTx.txId()) < 0);
+
+        // Should be directly mapped
+        assertThat(ctx.put.apply(client(), youngerTxProxy, key3), 
willSucceedFast());
+
+        // Younger is not allowed to wait with wait-die.
+        // Next operation should invalidate the transaction.
+        assertThat(ctx.put.apply(client(), youngerTxProxy, key), 
willThrowWithCauseOrSuppressed(ctx.expectedErr));
+
+        olderTxProxy.commit();
+
+        // Ensure all enlisted keys are unlocked.
+        assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key, key2, 
key3, key4)), willSucceedFast());
     }
 
     @AfterEach
@@ -1277,4 +1555,32 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
             return Objects.hash(key, val);
         }
     }
+
+    private static Stream<Arguments> killTestContextFactory() {
+        return Stream.of(
+                argumentSet("kv", new 
KillTestContext(TransactionException.class, 
ItThinClientTransactionsTest::putKv)),
+                argumentSet("sql", new KillTestContext(SqlException.class, 
ItThinClientTransactionsTest::putSql))
+        );
+    }
+
+    private static CompletableFuture<?> putSql(IgniteClient client, 
Transaction tx, Tuple key) {
+        return client.sql()
+                .executeAsync(tx, format("INSERT INTO %s (%s, %s) VALUES (?, 
?)", TABLE_NAME, COLUMN_KEY, COLUMN_VAL), key.intValue(0),
+                        key.intValue(0) + "");
+    }
+
+    private static CompletableFuture<?> putKv(IgniteClient client, Transaction 
tx, Tuple key) {
+        return client.tables().tables().get(0).keyValueView().putAsync(tx, 
key, val(key.intValue(0) + ""));
+    }
+
+    private static class KillTestContext {
+        final Class<? extends Exception> expectedErr;
+        final IgniteTriFunction<IgniteClient, Transaction, Tuple, 
CompletableFuture<?>> put;
+
+        public KillTestContext(Class<? extends Exception> expectedErr,
+                IgniteTriFunction<IgniteClient, Transaction, Tuple, 
CompletableFuture<?>> put) {
+            this.expectedErr = expectedErr;
+            this.put = put;
+        }
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index aaf913d0dbd..14c5a4837f4 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -187,8 +187,8 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                     TestTransactionIds.newTransactionId(),
                     randomUUID(),
                     true, // implicit
-                    10_000
-            );
+                    10_000,
+                    null);
         });
 
         lenient().when(replicaService.invoke(anyString(), 
any())).then(invocation -> {
@@ -411,8 +411,8 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 TestTransactionIds.newTransactionId(),
                 randomUUID(),
                 false,
-                10_000
-        );
+                10_000,
+                null);
     }
 
     private InternalTransaction newReadOnlyTransaction() {
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/KillTransactionTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItKillTransactionTest.java
similarity index 98%
rename from 
modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/KillTransactionTest.java
rename to 
modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItKillTransactionTest.java
index b4cb483cf6a..35b7609b677 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/KillTransactionTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItKillTransactionTest.java
@@ -47,7 +47,7 @@ import org.junit.jupiter.api.Test;
  * The behavior of this API is similar to the rollback invocation,
  * but a client has to get a specific exception when trying to interact with 
the transaction object.
  */
-public class KillTransactionTest extends ClusterPerClassIntegrationTest {
+public class ItKillTransactionTest extends ClusterPerClassIntegrationTest {
     @Override
     protected void configureInitParameters(InitParametersBuilder builder) {
         super.configureInitParameters(builder);
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
index 0e3b42aa1b3..e1605695da7 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
@@ -121,6 +121,10 @@ public interface InternalTransaction extends Transaction {
         return false;
     }
 
+    default boolean remoteOnCoordinator() {
+        return false;
+    }
+
     /**
      * Finishes a read-only transaction with a specific execution timestamp.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
index cea9ef47697..fa952c8ac90 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.tx;
 
+import java.util.function.Consumer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import 
org.apache.ignite.internal.tx.configuration.TransactionConfigurationSchema;
 import org.jetbrains.annotations.Nullable;
@@ -47,11 +48,16 @@ public class InternalTxOptions {
     @Nullable
     private final HybridTimestamp readTimestamp;
 
-    private InternalTxOptions(TxPriority priority, long timeoutMillis, 
@Nullable HybridTimestamp readTimestamp, @Nullable String txLabel) {
+    /** Transaction kill closure. Defines context specific action on tx kill. 
*/
+    private final @Nullable Consumer<InternalTransaction> killClosure;
+
+    private InternalTxOptions(TxPriority priority, long timeoutMillis, 
@Nullable HybridTimestamp readTimestamp, @Nullable String txLabel,
+            @Nullable Consumer<InternalTransaction> killClosure) {
         this.priority = priority;
         this.timeoutMillis = timeoutMillis;
         this.readTimestamp = readTimestamp;
         this.txLabel = txLabel;
+        this.killClosure = killClosure;
     }
 
     public static Builder builder() {
@@ -82,6 +88,10 @@ public class InternalTxOptions {
         return txLabel;
     }
 
+    public @Nullable Consumer<InternalTransaction> killClosure() {
+        return killClosure;
+    }
+
     /** Builder for InternalTxOptions. */
     public static class Builder {
         private TxPriority priority = TxPriority.NORMAL;
@@ -98,6 +108,8 @@ public class InternalTxOptions {
         @Nullable
         private String txLabel = null;
 
+        private Consumer<InternalTransaction> killClosure;
+
         public Builder priority(TxPriority priority) {
             this.priority = priority;
             return this;
@@ -118,8 +130,13 @@ public class InternalTxOptions {
             return this;
         }
 
+        public Builder killClosure(Consumer<InternalTransaction> r) {
+            this.killClosure = r;
+            return this;
+        }
+
         public InternalTxOptions build() {
-            return new InternalTxOptions(priority, timeoutMillis, 
readTimestamp, txLabel);
+            return new InternalTxOptions(priority, timeoutMillis, 
readTimestamp, txLabel, killClosure);
         }
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
index da5311e8da7..f0fc334d85e 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
@@ -48,6 +48,8 @@ public class TransactionExceptionMapperProvider implements 
IgniteExceptionMapper
                 err -> new IncompatibleSchemaException(err.traceId(), 
err.code(), err.getMessage(), err)));
         mappers.add(unchecked(DelayedAckException.class,
                 err -> new TransactionException(err.traceId(), err.code(), 
err.getMessage(), err.getCause())));
+        mappers.add(unchecked(TransactionKilledException.class,
+                err -> new TransactionException(err.traceId(), err.code(), 
err.getMessage(), err)));
 
         return mappers;
     }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionKilledException.java
similarity index 55%
copy from 
modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
copy to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionKilledException.java
index ef9e06513b5..0aa02f24c7e 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionKilledException.java
@@ -15,42 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client;
+package org.apache.ignite.internal.tx;
+
+import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_KILLED_ERR;
 
 import java.util.UUID;
-import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.tx.RetriableTransactionException;
 
 /**
- * Holds the transaction id and the cause for delayed replication ack failure.
+ * Reports killed transaction.
  */
-public class ClientDelayedAckException extends IgniteInternalException {
-    /** Serial version uid. */
+public class TransactionKilledException extends TransactionInternalException 
implements RetriableTransactionException {
     private static final long serialVersionUID = 0L;
 
-    /** Transaction id. */
     private final UUID txId;
 
     /**
-     * Constructor.
-     *
-     * @param traceId Trace ID.
-     * @param code Error code.
+     * Creates the exception with txId and optional txManager for label 
formatting.
      *
-     * @param message String message.
-     * @param txId Related transaction id.
-     * @param cause Cause.
+     * @param txId The transaction id.
+     * @param txManager transaction manager to retrieve label for logging.
      */
-    ClientDelayedAckException(UUID traceId, int code, @Nullable String 
message, UUID txId, @Nullable Throwable cause) {
-        super(traceId, code, message, cause);
-
+    public TransactionKilledException(UUID txId, TxManager txManager) {
+        super(
+                TX_KILLED_ERR,
+                "Transaction is killed " + formatTxInfo(txId, txManager)
+        );
         this.txId = txId;
     }
 
     /**
-     * Gets expected schema version.
+     * Returns a transaction id.
      *
-     * @return Expected schema version.
+     * @return The id.
      */
     public UUID txId() {
         return txId;
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index e2e2cb6f619..4d6ecf0f38a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.tx;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -154,9 +155,8 @@ public interface TxManager extends IgniteComponent {
      * Returns lock manager.
      *
      * @return Lock manager for the given transactions manager.
-     * @deprecated Use lockManager directly.
      */
-    @Deprecated
+    @TestOnly
     LockManager lockManager();
 
     /**
@@ -218,7 +218,7 @@ public interface TxManager extends IgniteComponent {
      */
     CompletableFuture<Void> cleanup(
             @Nullable ZonePartitionId commitPartitionId,
-            Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions,
+            Map<ZonePartitionId, ? extends PartitionEnlistment> 
enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
@@ -270,6 +270,14 @@ public interface TxManager extends IgniteComponent {
      */
     CompletableFuture<Boolean> kill(UUID txId);
 
+    /**
+     * Discards local write intents. Used together with kill command.
+     *
+     * @param groups Groups.
+     * @param txId Transaction id.
+     */
+    CompletableFuture<Void> 
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId);
+
     /**
      * Returns lock retry count.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
index 34075b2c78e..baf57bda9c7 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
@@ -63,13 +63,13 @@ public abstract class IgniteAbstractTransactionImpl 
implements InternalTransacti
      * The constructor.
      *
      * @param txManager The tx manager.
-     * @param id The id.
      * @param observableTsTracker Observation timestamp tracker.
+     * @param id The id.
      * @param coordinatorId Transaction coordinator inconsistent ID.
      * @param implicit True for an implicit transaction, false for an ordinary 
one.
      * @param timeout Transaction timeout in milliseconds.
      */
-    public IgniteAbstractTransactionImpl(
+    IgniteAbstractTransactionImpl(
             TxManager txManager,
             HybridTimestampTracker observableTsTracker,
             UUID id,
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index ad18054f37e..25b96de1cf1 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -31,11 +31,14 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
 import org.apache.ignite.internal.tx.TransactionIds;
+import org.apache.ignite.internal.tx.TransactionKilledException;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
@@ -70,6 +73,12 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
      */
     private boolean noRemoteWrites = true;
 
+    /**
+     * A closure which is called then a transaction is externally killed (not 
by direct user call).
+     * Not-null only for a client's transaction.
+     */
+    private final @Nullable Consumer<InternalTransaction> killClosure;
+
     /**
      * Constructs an explicit read-write transaction.
      *
@@ -79,6 +88,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
      * @param txCoordinatorId Transaction coordinator inconsistent ID.
      * @param implicit True for an implicit transaction, false for an ordinary 
one.
      * @param timeout The timeout.
+     * @param killClosure Kill closure.
      */
     public ReadWriteTransactionImpl(
             TxManager txManager,
@@ -86,9 +96,11 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
             UUID id,
             UUID txCoordinatorId,
             boolean implicit,
-            long timeout
+            long timeout,
+            @Nullable Consumer<InternalTransaction> killClosure
     ) {
         super(txManager, observableTsTracker, id, txCoordinatorId, implicit, 
timeout);
+        this.killClosure = killClosure;
     }
 
     /** {@inheritDoc} */
@@ -119,8 +131,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     ) {
         // No need to wait for lock if commit is in progress.
         if (!enlistPartitionLock.readLock().tryLock()) {
-            failEnlist();
-            assert false; // Not reachable.
+            throw enlistFailedException();
         }
 
         try {
@@ -140,20 +151,20 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     /**
      * Fails the operation.
      */
-    private void failEnlist() {
-        throw new TransactionException(
-                TX_ALREADY_FINISHED_ERR,
-                format("Transaction is already finished [{}, txState={}].",
-                        formatTxInfo(id(), txManager, false), state()));
+    private RuntimeException enlistFailedException() {
+        return killed ? new TransactionKilledException(id(), txManager) :
+                new TransactionException(
+                        TX_ALREADY_FINISHED_ERR,
+                        format("Transaction is already finished [{}, 
txState={}].",
+                                formatTxInfo(id(), txManager, false), 
state()));
     }
 
     /**
      * Checks that this transaction was not finished and will be able to 
enlist another partition.
      */
     private void checkEnlistPossibility() {
-        if (isFinishingOrFinished()) {
-            // This means that the transaction is either in final or FINISHING 
state.
-            failEnlist();
+        if (isFinishingOrFinished() || killed) {
+            throw enlistFailedException();
         }
     }
 
@@ -220,14 +231,12 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
             if (finishFuture == null) {
                 if (killed) {
                     if (isComplete) {
+                        // An attempt to finish a killed transaction.
                         finishFuture = nullCompletedFuture();
 
-                        return failedFuture(new TransactionException(
-                                TX_ALREADY_FINISHED_ERR,
-                                format("Transaction is killed [{}, 
txState={}].",
-                                        formatTxInfo(id(), txManager, false), 
state())
-                        ));
+                        return failedFuture(new 
TransactionKilledException(id(), txManager));
                     } else {
+                        // Kill is called twice.
                         return nullCompletedFuture();
                     }
                 }
@@ -239,6 +248,9 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
                         finishFuture = nullCompletedFuture();
                         this.timeoutExceeded = timeoutExceeded;
                     } else {
+                        if (killClosure == null) {
+                            throw new AssertionError("Invalid kill state for a 
full transaction");
+                        }
                         killed = true;
                     }
                 } else {
@@ -258,6 +270,16 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
                         this.timeoutExceeded = timeoutExceeded;
                     } else {
                         killed = true;
+
+                        return finishFutureInternal.handle((unused, throwable) 
-> {
+                            // TODO 
https://issues.apache.org/jira/browse/IGNITE-25825 move before finish after 
async cleanup
+                            if (killClosure != null) {
+                                // Notify the client about the kill.
+                                killClosure.accept(this);
+                            }
+
+                            return null;
+                        });
                     }
 
                     // Return the real future first time.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index 3fee6424cf3..83934e5773a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -147,29 +147,6 @@ public class TransactionInflights {
         return res[0];
     }
 
-    /**
-     * Track the given RO transaction until finish.
-     * Currently RO tracking is used to prevent unclosed cursors.
-     *
-     * @param txId The transaction id.
-     * @return {@code True} if the was registered and is in active state.
-     */
-    public boolean trackReadOnly(UUID txId) {
-        boolean[] res = {true};
-
-        txCtxMap.compute(txId, (uuid, ctx) -> {
-            if (ctx == null) {
-                ctx = new ReadOnlyTxContext();
-            }
-
-            res[0] = !ctx.isTxFinishing();
-
-            return ctx;
-        });
-
-        return res[0];
-    }
-
     /**
      * Unregisters the inflight for a transaction.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index fdd2a58c5a3..c08f55865ae 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -291,6 +291,36 @@ public class TxCleanupRequestHandler {
         messagingService.send(sender, ChannelType.DEFAULT, prepareResponse(new 
CleanupReplicatedInfo(txId, partitions)));
     }
 
+    /**
+     * Discards local write intents.
+     *
+     * @param partitions Partitions.
+     * @param txId The transaction id.
+     *
+     * @return The future.
+     */
+    CompletableFuture<Void> 
discardLocalWriteIntents(List<EnlistedPartitionGroup> partitions, UUID txId) {
+        Map<EnlistedPartitionGroup, CompletableFuture<?>> writeIntentSwitches 
= IgniteUtils.newHashMap(partitions.size());
+
+        for (EnlistedPartitionGroup partition : partitions) {
+            CompletableFuture<?> future = 
writeIntentSwitchProcessor.switchLocalWriteIntents(
+                    partition,
+                    txId,
+                    false,
+                    null
+            );
+
+            writeIntentSwitches.put(partition, future);
+        }
+
+        releaseTxLocks(txId);
+
+        remotelyTriggeredResourceRegistry.close(txId);
+
+        // We don't care about replicating discarded write intents state, 
because it will be lazily resolved if needed.
+        return allOf(writeIntentSwitches.values().toArray(new 
CompletableFuture<?>[0]));
+    }
+
     private static class CleanupContext {
         private final InternalClusterNode sender;
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index c48d4e655fe..a8e41b8e6d5 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -203,7 +203,7 @@ public class TxCleanupRequestSender {
      */
     public CompletableFuture<Void> cleanup(
             @Nullable ZonePartitionId commitPartitionId,
-            Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions,
+            Map<ZonePartitionId, ? extends PartitionEnlistment> 
enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 69ffa216e6a..0bd927e95cc 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -494,7 +494,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
                 txId,
                 localNodeId,
                 implicit,
-                timeout
+                timeout,
+                options.killClosure()
         );
 
         // Implicit transactions are finished as soon as their operation/query 
is finished, they cannot be abandoned, so there is
@@ -1146,7 +1147,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
     @Override
     public CompletableFuture<Void> cleanup(
             @Nullable ZonePartitionId commitPartitionId,
-            Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions,
+            Map<ZonePartitionId, ? extends PartitionEnlistment> 
enlistedPartitions,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
@@ -1202,6 +1203,15 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler, SystemVi
         return falseCompletedFuture();
     }
 
+    @Override
+    public CompletableFuture<Void> 
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId) {
+        return txCleanupRequestHandler.discardLocalWriteIntents(groups, 
txId).handle((r, e) -> {
+            // We don't need tx state any more.
+            updateTxMeta(txId, old -> null);
+            return null;
+        });
+    }
+
     @Override
     public int lockRetryCount() {
         return lockRetryCount;
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
index 631a6fd26dd..4a613e36c43 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
@@ -304,6 +304,24 @@ public abstract class AbstractDeadlockPreventionTest 
extends AbstractLockingTest
         assertThat(xlock(tx2, k), willSucceedFast());
     }
 
+    @Test
+    public void testDeadlockRecovery() {
+        var tx1 = beginTx();
+        var tx2 = beginTx();
+
+        var k = key("test");
+
+        assertThat(slock(tx2, k), willSucceedFast());
+        assertThat(slock(tx1, k), willSucceedFast());
+
+        assertFutureFailsOrWaitsForTimeout(() ->  xlock(tx2, k));
+
+        // Failed lock will be rolled to S state. We need to release it 
manually.
+        release(tx2, k, LockMode.S);
+
+        assertThat(xlock(tx1, k), willSucceedFast());
+    }
+
     /**
      * This method checks lock future of conflicting transaction provided by 
supplier, in a way depending on deadlock prevention policy.
      * If the policy does not allow wait on conflict (wait timeout is equal to 
{@code 0}) then the future must be failed with
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
index 4ce5d3a18a6..8ddf65e33b9 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
@@ -49,8 +49,7 @@ class ReadOnlyTransactionImplTest extends 
BaseIgniteAbstractTest {
                 new UUID(1, 2),
                 10_000,
                 readTimestamp,
-                new CompletableFuture<>()
-        );
+                new CompletableFuture<>());
 
         assertThat(tx.schemaTimestamp(), is(readTimestamp));
     }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
index 8a4c1a16864..51ab93f3819 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
@@ -80,8 +80,7 @@ class ReadWriteTransactionImplTest extends 
BaseIgniteAbstractTest {
         UUID txId = 
TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(beginTs);
 
         var tx = new ReadWriteTransactionImpl(
-                txManager, HybridTimestampTracker.atomicTracker(null), txId, 
CLUSTER_NODE.id(), false, 10_000
-        );
+                txManager, HybridTimestampTracker.atomicTracker(null), txId, 
CLUSTER_NODE.id(), false, 10_000, null);
 
         assertThat(tx.schemaTimestamp(), is(beginTs));
     }
@@ -114,8 +113,7 @@ class ReadWriteTransactionImplTest extends 
BaseIgniteAbstractTest {
         UUID txId = 
TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(beginTs);
 
         var tx = new ReadWriteTransactionImpl(
-                txManager, HybridTimestampTracker.atomicTracker(null), txId, 
CLUSTER_NODE.id(), false, 10_000
-        );
+                txManager, HybridTimestampTracker.atomicTracker(null), txId, 
CLUSTER_NODE.id(), false, 10_000, null);
 
         tx.assignCommitPartition(txCommitPart);
 


Reply via email to