This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d6b90d309 IGNITE-20874 Node cleanup procedure (#2877)
5d6b90d309 is described below

commit 5d6b90d309bbc7f79ff7f43863507c8d8c3042de
Author: Cyrill <cyrill.si...@gmail.com>
AuthorDate: Fri Dec 8 20:24:13 2023 +0300

    IGNITE-20874 Node cleanup procedure (#2877)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  |  10 +-
 .../ignite/internal/table/ItDurableFinishTest.java |   4 +-
 .../ItTxDistributedCleanupRecoveryTest.java        | 144 ++-----------------
 ...xDistributedTestSingleNodeNoCleanupMessage.java |   6 +-
 .../table/distributed/StorageUpdateHandler.java    |  12 +-
 .../table/distributed/TableMessageGroup.java       |   6 +-
 ...pCommand.java => WriteIntentSwitchCommand.java} |   4 +-
 .../table/distributed/raft/PartitionListener.java  |  12 +-
 .../replicator/PartitionReplicaListener.java       | 145 +++++--------------
 .../table/distributed/IndexCleanupTest.java        |  12 +-
 .../table/distributed/StorageCleanupTest.java      |  38 ++---
 .../PartitionRaftCommandsSerializationTest.java    |  20 +--
 .../raft/PartitionCommandListenerTest.java         |  20 +--
 .../PartitionReplicaListenerDurableUnlockTest.java |  47 ++----
 .../replication/PartitionReplicaListenerTest.java  |  16 +--
 .../table/impl/DummyInternalTableImpl.java         |  65 ++++++++-
 .../org/apache/ignite/internal/tx/TxManager.java   |  17 ++-
 .../internal/tx/impl/PlacementDriverHelper.java    | 148 +++++++++++++++++++
 .../internal/tx/impl/TxCleanupRequestHandler.java  | 158 +++++++++++++++++++++
 .../internal/tx/impl/TxCleanupRequestSender.java   | 151 ++++++++++++++++++++
 .../ignite/internal/tx/impl/TxManagerImpl.java     | 114 ++++++---------
 .../ignite/internal/tx/impl/TxMessageSender.java   | 158 +++++++++++++++++++++
 .../tx/impl/WriteIntentSwitchProcessor.java        | 121 ++++++++++++++++
 ...upReplicaRequest.java => TxCleanupMessage.java} |  27 ++--
 .../tx/message/TxCleanupMessageErrorResponse.java  |  35 +++++
 .../tx/message/TxCleanupMessageResponse.java       |  28 ++++
 .../ignite/internal/tx/message/TxMessageGroup.java |  19 ++-
 ...t.java => WriteIntentSwitchReplicaRequest.java} |  12 +-
 modules/transactions/tech-notes/cleanup.puml       |  51 +++++++
 29 files changed, 1147 insertions(+), 453 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 43ff6feaa9..dce43dd8d2 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.fakes;
 
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -176,16 +177,15 @@ public class FakeTxManager implements TxManager {
             Map<TablePartitionId, Long> enlistedGroups,
             UUID txId
     ) {
-        return null;
+        return nullCompletedFuture();
     }
 
     @Override
     public CompletableFuture<Void> cleanup(
-            String primaryConsistentId,
-            TablePartitionId tablePartitionId,
-            UUID txId,
+            Collection<TablePartitionId> partitions,
             boolean commit,
-            @Nullable HybridTimestamp commitTimestamp
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId
     ) {
         return nullCompletedFuture();
     }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index c55f46f26e..ab22a7f004 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -40,8 +40,8 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxMeta;
-import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.DefaultMessagingService;
@@ -229,7 +229,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         // Make sure the finish message is prepared, i.e. the outcome, commit 
timestamp, primary node, etc. have been set,
         // and then temporarily block the messaging to simulate network issues.
         primaryMessaging.dropMessages((s, networkMessage) -> {
-            if (networkMessage instanceof TxCleanupReplicaRequest && 
dropMessage.get()) {
+            if (networkMessage instanceof WriteIntentSwitchReplicaRequest && 
dropMessage.get()) {
                 logger().info("Dropping message: {}.", networkMessage);
 
                 return true;
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
index 4f102c2eca..469581cd52 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
@@ -17,40 +17,10 @@
 
 package org.apache.ignite.distributed;
 
-import static java.util.concurrent.CompletableFuture.failedFuture;
-import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
-import org.apache.ignite.internal.catalog.CatalogService;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.replicator.ReplicaResult;
-import org.apache.ignite.internal.replicator.exception.ReplicationException;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.table.distributed.IndexLocker;
-import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
-import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
-import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
-import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
-import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
-import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
-import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
-import org.apache.ignite.internal.util.Lazy;
-import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.tx.TransactionException;
+import org.apache.ignite.internal.tx.message.TxCleanupMessage;
+import org.apache.ignite.network.DefaultMessagingService;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
 /**
@@ -88,66 +58,7 @@ public class ItTxDistributedCleanupRecoveryTest extends 
ItTxDistributedTestSingl
                 replicas(),
                 startClient(),
                 timestampTracker
-        ) {
-
-            @Override
-            protected PartitionReplicaListener newReplicaListener(
-                    MvPartitionStorage mvDataStorage,
-                    RaftGroupService raftClient,
-                    TxManager txManager,
-                    Executor scanRequestExecutor,
-                    int partId,
-                    int tableId,
-                    Supplier<Map<Integer, IndexLocker>> indexesLockers,
-                    Lazy<TableSchemaAwareIndexStorage> pkIndexStorage,
-                    Supplier<Map<Integer, TableSchemaAwareIndexStorage>> 
secondaryIndexStorages,
-                    HybridClock hybridClock,
-                    PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTime,
-                    TxStateStorage txStateStorage,
-                    TransactionStateResolver transactionStateResolver,
-                    StorageUpdateHandler storageUpdateHandler,
-                    ValidationSchemasSource validationSchemasSource,
-                    ClusterNode localNode,
-                    SchemaSyncService schemaSyncService,
-                    CatalogService catalogService,
-                    PlacementDriver placementDriver
-            ) {
-                return new PartitionReplicaListener(
-                        mvDataStorage,
-                        raftClient,
-                        txManager,
-                        txManager.lockManager(),
-                        Runnable::run,
-                        partId,
-                        tableId,
-                        indexesLockers,
-                        pkIndexStorage,
-                        secondaryIndexStorages,
-                        hybridClock,
-                        safeTime,
-                        txStateStorage,
-                        transactionStateResolver,
-                        storageUpdateHandler,
-                        validationSchemasSource,
-                        localNode,
-                        schemaSyncService,
-                        catalogService,
-                        placementDriver
-                ) {
-                    @Override
-                    public CompletableFuture<ReplicaResult> 
invoke(ReplicaRequest request, String senderId) {
-                        if (request instanceof TxCleanupReplicaRequest && 
defaultRetryCount.getAndDecrement() > 0) {
-                            logger().info("Dropping cleanup request: {}", 
request);
-
-                            return failedFuture(new ReplicationException(
-                                    REPLICA_COMMON_ERR,
-                                    "Test Tx Cleanup exception 
[replicaGroupId=" + request.groupId() + ']'));
-                        }
-                        return super.invoke(request, senderId);
-                    }
-                };
-            }
-        };
+        );
 
         txTestCluster.prepareCluster();
 
@@ -156,47 +67,18 @@ public class ItTxDistributedCleanupRecoveryTest extends 
ItTxDistributedTestSingl
         accounts = txTestCluster.startTable(ACC_TABLE_NAME, ACC_TABLE_ID, 
ACCOUNTS_SCHEMA);
         customers = txTestCluster.startTable(CUST_TABLE_NAME, CUST_TABLE_ID, 
CUSTOMERS_SCHEMA);
 
-        log.info("Tables have been started");
-    }
-
-
-    @Test
-    @Override
-    public void testDeleteUpsertCommit() throws TransactionException {
-        // The value of 6 is higher than the default retry count.
-        // So we should give up retrying and crash.
-        setDefaultRetryCount(6);
-
-        assertThrows(TransactionException.class, () -> 
deleteUpsert().commit());
-    }
+        txTestCluster.cluster.forEach(clusterService -> {
+            DefaultMessagingService messagingService = 
(DefaultMessagingService) clusterService.messagingService();
+            messagingService.dropMessages((s, networkMessage) -> {
+                if (networkMessage instanceof TxCleanupMessage && 
defaultRetryCount.getAndDecrement() > 0) {
+                    logger().info("Dropping cleanup request: {}", 
networkMessage);
 
-    @Test
-    @Override
-    public void testTransactionAlreadyRolledback() {
-        // The value of 6 is higher than the default retry count.
-        // So we should give up retrying and crash.
-        setDefaultRetryCount(6);
-
-        // Do not check the locks since we have intentionally dropped the 
cleanup request thus the locks are not released yet.
-        testTransactionAlreadyFinished(false, false, (transaction, txId) -> {
-            assertThrows(TransactionException.class, transaction::rollback);
-
-            log.info("Rolled back transaction {}", txId);
+                    return true;
+                }
+                return false;
+            });
         });
-    }
 
-    @Test
-    @Override
-    public void testTransactionAlreadyCommitted() {
-        // The value of 6 is higher than the default retry count.
-        // So we should give up retrying and crash.
-        setDefaultRetryCount(6);
-
-        // Do not check the locks since we have intentionally dropped the 
cleanup request thus the locks are not released yet.
-        testTransactionAlreadyFinished(true, false, (transaction, txId) -> {
-            assertThrows(TransactionException.class, transaction::commit);
-
-            log.info("Committed transaction {}", txId);
-        });
+        log.info("Tables have been started");
     }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index e0fcc50880..97201815c5 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -56,7 +56,7 @@ import 
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -176,11 +176,11 @@ public class 
ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut
                 ) {
                     @Override
                     public CompletableFuture<ReplicaResult> 
invoke(ReplicaRequest request, String senderId) {
-                        if (request instanceof TxCleanupReplicaRequest) {
+                        if (request instanceof 
WriteIntentSwitchReplicaRequest) {
                             logger().info("Dropping cleanup request: {}", 
request);
 
                             releaseTxLocks(
-                                    ((TxCleanupReplicaRequest) request).txId(),
+                                    ((WriteIntentSwitchReplicaRequest) 
request).txId(),
                                     txManager.lockManager()
                             );
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 9d5050e94b..ca678a9ca1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -284,25 +284,27 @@ public class StorageUpdateHandler {
     }
 
     /**
-     * Handles the cleanup of a transaction. The transaction is either 
committed or rolled back.
+     * Switches write intents created by the transaction to regular values if 
the transaction is committed
+     * or removes them if the transaction is aborted.
      *
      * @param txId Transaction id.
      * @param commit Commit flag. {@code true} if transaction is committed, 
{@code false} otherwise.
      * @param commitTimestamp Commit timestamp. Not {@code null} if {@code 
commit} is {@code true}.
      */
-    public void handleTransactionCleanup(UUID txId, boolean commit, @Nullable 
HybridTimestamp commitTimestamp) {
-        handleTransactionCleanup(txId, commit, commitTimestamp, null);
+    public void switchWriteIntents(UUID txId, boolean commit, @Nullable 
HybridTimestamp commitTimestamp) {
+        switchWriteIntents(txId, commit, commitTimestamp, null);
     }
 
     /**
-     * Handles the cleanup of a transaction. The transaction is either 
committed or rolled back.
+     * Switches write intents created by the transaction to regular values if 
the transaction is committed
+     * or removes them if the transaction is aborted.
      *
      * @param txId Transaction id.
      * @param commit Commit flag. {@code true} if transaction is committed, 
{@code false} otherwise.
      * @param commitTimestamp Commit timestamp. Not {@code null} if {@code 
commit} is {@code true}.
      * @param onApplication On application callback.
      */
-    public void handleTransactionCleanup(
+    public void switchWriteIntents(
             UUID txId,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 9e697fa2d5..56eeaa3e8d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -23,9 +23,9 @@ import 
org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import 
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
-import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import 
org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand;
 import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
 import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
@@ -192,8 +192,8 @@ public interface TableMessageGroup {
         /** Message type for {@link FinishTxCommand}. */
         short FINISH_TX = 40;
 
-        /** Message type for {@link TxCleanupCommand}. */
-        short TX_CLEANUP = 41;
+        /** Message type for {@link WriteIntentSwitchCommand}. */
+        short WRITE_INTENT_SWITCH = 41;
 
         /** Message type for {@link UpdateAllCommand}. */
         short UPDATE_ALL = 42;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/WriteIntentSwitchCommand.java
similarity index 92%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
rename to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/WriteIntentSwitchCommand.java
index efed17c540..c51610e307 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/WriteIntentSwitchCommand.java
@@ -27,8 +27,8 @@ import org.jetbrains.annotations.Nullable;
 /**
  * State machine command to cleanup on a transaction commit.
  */
-@Transferable(TableMessageGroup.Commands.TX_CLEANUP)
-public interface TxCleanupCommand extends PartitionCommand {
+@Transferable(TableMessageGroup.Commands.WRITE_INTENT_SWITCH)
+public interface WriteIntentSwitchCommand extends PartitionCommand {
     /**
      * Returns a commit or a rollback state.
      */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 38e23f85d9..e516799cc2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -62,9 +62,9 @@ import 
org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
-import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import 
org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
@@ -199,8 +199,8 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
                     handleUpdateAllCommand((UpdateAllCommand) command, 
commandIndex, commandTerm);
                 } else if (command instanceof FinishTxCommand) {
                     handleFinishTxCommand((FinishTxCommand) command, 
commandIndex, commandTerm);
-                } else if (command instanceof TxCleanupCommand) {
-                    handleTxCleanupCommand((TxCleanupCommand) command, 
commandIndex, commandTerm);
+                } else if (command instanceof WriteIntentSwitchCommand) {
+                    handleWriteIntentSwitchCommand((WriteIntentSwitchCommand) 
command, commandIndex, commandTerm);
                 } else if (command instanceof SafeTimeSyncCommand) {
                     handleSafeTimeSyncCommand((SafeTimeSyncCommand) command, 
commandIndex, commandTerm);
                 } else if (command instanceof BuildIndexCommand) {
@@ -368,13 +368,13 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
 
     /**
-     * Handler for the {@link TxCleanupCommand}.
+     * Handler for the {@link WriteIntentSwitchCommand}.
      *
      * @param cmd Command.
      * @param commandIndex Index of the RAFT command.
      * @param commandTerm Term of the RAFT command.
      */
-    private void handleTxCleanupCommand(TxCleanupCommand cmd, long 
commandIndex, long commandTerm) {
+    private void handleWriteIntentSwitchCommand(WriteIntentSwitchCommand cmd, 
long commandIndex, long commandTerm) {
         // Skips the write command because the storage has already executed it.
         if (commandIndex <= storage.lastAppliedIndex()) {
             return;
@@ -384,7 +384,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
         markFinished(txId, cmd.commit(), cmd.commitTimestamp(), 
cmd.txCoordinatorId());
 
-        storageUpdateHandler.handleTransactionCleanup(txId, cmd.commit(), 
cmd.commitTimestamp(),
+        storageUpdateHandler.switchWriteIntents(txId, cmd.commit(), 
cmd.commitTimestamp(),
                 () -> storage.lastApplied(commandIndex, commandTerm));
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 0628f26bd6..232bce8a49 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -21,7 +21,6 @@ import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
@@ -36,12 +35,10 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.emptyCollection
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static org.apache.ignite.internal.util.IgniteUtils.findAny;
 import static org.apache.ignite.internal.util.IgniteUtils.findFirst;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
-import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_WAS_ABORTED_ERR;
 import static org.apache.ignite.raft.jraft.util.internal.ThrowUtil.hasCause;
@@ -65,7 +62,6 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
@@ -124,10 +120,10 @@ import 
org.apache.ignite.internal.table.distributed.command.FinishTxCommandBuild
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import 
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
 import 
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessageBuilder;
-import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import 
org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder;
+import 
org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
@@ -160,10 +156,10 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
-import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
 import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.Cursor;
@@ -184,10 +180,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /** Logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(PartitionReplicaListener.class);
 
-    private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
-
-    private static final int ATTEMPTS_TO_CLEANUP_REPLICA = 5;
-
     /** Factory to create RAFT command messages. */
     private static final TableMessagesFactory MSG_FACTORY = new 
TableMessagesFactory();
 
@@ -388,7 +380,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     }
 
     private CompletableFuture<?> durableCleanup(UUID txId, TxMeta txMeta) {
-        return cleanup(txId, txMeta)
+        Collection<TablePartitionId> enlistedPartitions = 
txMeta.enlistedPartitions();
+
+        boolean commit = txMeta.txState() == COMMITED;
+
+        HybridTimestamp commitTimestamp = txMeta.commitTimestamp();
+
+        return txManager.cleanup(enlistedPartitions, commit, commitTimestamp, 
txId)
                 .handle((v, e) -> {
                     if (e == null) {
                         return txManager.executeCleanupAsync(() -> 
markLocksReleased(
@@ -706,8 +704,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return nullCompletedFuture();
         } else if (request instanceof TxFinishReplicaRequest) {
             return processTxFinishAction((TxFinishReplicaRequest) request, 
senderId);
-        } else if (request instanceof TxCleanupReplicaRequest) {
-            return processTxCleanupAction((TxCleanupReplicaRequest) request);
+        } else if (request instanceof WriteIntentSwitchReplicaRequest) {
+            return 
processWriteIntentSwitchAction((WriteIntentSwitchReplicaRequest) request);
         } else if (request instanceof ReadOnlySingleRowPkReplicaRequest) {
             return 
processReadOnlySingleEntryAction((ReadOnlySingleRowPkReplicaRequest) request, 
isPrimary);
         } else if (request instanceof ReadOnlyMultiRowPkReplicaRequest) {
@@ -1534,83 +1532,17 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             }
         }
 
-        CompletableFuture<?> changeStateFuture = 
finishTransaction(enlistedPartitions, txId, commit, commitTimestamp, 
txCoordinatorId);
-
-        return cleanup(changeStateFuture, enlistedPartitions, commit, 
commitTimestamp, txId, ATTEMPTS_TO_CLEANUP_REPLICA)
-                .thenRun(() -> markLocksReleased(
-                        txId,
-                        enlistedPartitions,
-                        commit ? COMMITED : ABORTED,
-                        commitTimestamp)
+        return finishTransaction(enlistedPartitions, txId, commit, 
commitTimestamp, txCoordinatorId)
+                .thenCompose(v -> txManager.cleanup(enlistedPartitions, 
commit, commitTimestamp, txId))
+                .thenRun(() ->
+                        markLocksReleased(
+                                txId,
+                                enlistedPartitions,
+                                commit ? COMMITED : ABORTED,
+                                commitTimestamp)
                 );
     }
 
-    private CompletableFuture<Void> cleanup(UUID txId, TxMeta txMeta) {
-        return cleanup(nullCompletedFuture(), txMeta.enlistedPartitions(), 
txMeta.txState() == COMMITED, txMeta.commitTimestamp(), txId, 1);
-    }
-
-    // TODO https://issues.apache.org/jira/browse/IGNITE-20681 remove attempts 
count.
-    private CompletableFuture<Void> cleanup(
-            CompletableFuture<?> changeStateFuture,
-            Collection<TablePartitionId> enlistedPartitions,
-            boolean commit,
-            @Nullable HybridTimestamp commitTimestamp,
-            UUID txId,
-            int attemptsToCleanupReplica
-    ) {
-        CompletableFuture<?>[] futures = enlistedPartitions.stream()
-                .map(partitionId -> changeStateFuture.thenCompose(ignored ->
-                        // TODO: IGNITE-20874 Use the node cleanup procedure 
instead of the replication group cleanup one.
-                        cleanupWithRetry(commit, commitTimestamp, txId, 
partitionId, attemptsToCleanupReplica)))
-                .toArray(size -> new CompletableFuture<?>[size]);
-
-        return allOf(futures);
-    }
-
-    private CompletableFuture<Void> cleanupWithRetry(
-            boolean commit,
-            @Nullable HybridTimestamp commitTimestamp,
-            UUID txId,
-            TablePartitionId partitionId,
-            int attempts) {
-        return findPrimaryReplica(partitionId, hybridClock.now())
-                .thenCompose(leaseHolder ->
-                        txManager.cleanup(leaseHolder, partitionId, txId, 
commit, commitTimestamp))
-                .handle((res, ex) -> {
-                    if (ex != null) {
-                        if (attempts > 0) {
-                            LOG.warn("Failed to perform cleanup on Tx. The 
operation will be retried [txId={}].", txId, ex);
-                        } else {
-                            LOG.warn("Failed to perform cleanup on Tx 
[txId={}].", txId, ex);
-                        }
-
-                        if (attempts > 0) {
-                            return cleanupWithRetry(commit, commitTimestamp, 
txId, partitionId, attempts - 1);
-                        }
-
-                        return CompletableFuture.<Void>failedFuture(ex);
-                    }
-
-                    return CompletableFutures.<Void>nullCompletedFuture();
-                })
-                .thenCompose(Function.identity());
-    }
-
-    private CompletableFuture<String> findPrimaryReplica(TablePartitionId 
partitionId, HybridTimestamp now) {
-        return placementDriver.awaitPrimaryReplica(partitionId, now, 
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS)
-                .handle((primaryReplica, e) -> {
-                    if (e != null) {
-                        LOG.error("Failed to retrieve primary replica for 
partition {}", partitionId, e);
-
-                        throw withCause(TransactionException::new, 
REPLICA_UNAVAILABLE_ERR,
-                                "Failed to get the primary replica"
-                                        + " [tablePartitionId=" + partitionId 
+ ", awaitTimestamp=" + now + ']', e);
-                    }
-
-                    return primaryReplica.getLeaseholder();
-                });
-    }
-
     /**
      * Finishes a transaction. This operation is idempotent.
      *
@@ -1694,7 +1626,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return CompletableFuture of void.
      */
     // TODO: need to properly handle primary replica changes 
https://issues.apache.org/jira/browse/IGNITE-17615
-    private CompletableFuture<Void> 
processTxCleanupAction(TxCleanupReplicaRequest request) {
+    private CompletableFuture<Void> 
processWriteIntentSwitchAction(WriteIntentSwitchReplicaRequest request) {
         try {
             closeAllTransactionCursors(request.txId());
         } catch (Exception e) {
@@ -1711,22 +1643,19 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         HybridTimestamp commandTimestamp = hybridClock.now();
 
                         return reliableCatalogVersionFor(commandTimestamp)
-                                .thenCompose(catalogVersion ->
-                                        applyCleanupCommand(
-                                                request.txId(),
-                                                request.commit(),
-                                                request.commitTimestamp(),
-                                                request.commitTimestampLong(),
-                                                catalogVersion
-                                        ))
-                                .thenApply(unused -> res);
+                                .thenCompose(catalogVersion -> {
+                                    applyWriteIntentSwitchCommand(
+                                            request.txId(),
+                                            request.commit(),
+                                            request.commitTimestamp(),
+                                            request.commitTimestampLong(),
+                                            catalogVersion
+                                    );
+
+                                    return nullCompletedFuture();
+                                });
                     } else {
-                        return completedFuture(res);
-                    }
-                })
-                .thenAccept(res -> {
-                    if (res.hadUpdateFutures() || res.hadReadFutures()) {
-                        releaseTxLocks(request.txId());
+                        return nullCompletedFuture();
                     }
                 });
     }
@@ -1759,14 +1688,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .thenApply(v -> new 
FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty()));
     }
 
-    private CompletableFuture<Void> applyCleanupCommand(
+    private CompletableFuture<Void> applyWriteIntentSwitchCommand(
             UUID transactionId,
             boolean commit,
             HybridTimestamp commitTimestamp,
             long commitTimestampLong,
             int catalogVersion
     ) {
-        TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand()
+        WriteIntentSwitchCommand wiSwitchCmd = 
MSG_FACTORY.writeIntentSwitchCommand()
                 .txId(transactionId)
                 .commit(commit)
                 .commitTimestampLong(commitTimestampLong)
@@ -1775,11 +1704,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 .requiredCatalogVersion(catalogVersion)
                 .build();
 
-        storageUpdateHandler.handleTransactionCleanup(transactionId, commit, 
commitTimestamp);
+        storageUpdateHandler.switchWriteIntents(transactionId, commit, 
commitTimestamp);
 
         CompletableFuture<Object> resultFuture = new CompletableFuture<>();
 
-        applyCmdWithRetryOnSafeTimeReorderException(txCleanupCmd, 
resultFuture);
+        applyCmdWithRetryOnSafeTimeReorderException(wiSwitchCmd, resultFuture);
 
         return resultFuture
                 .exceptionally(e -> {
@@ -3485,7 +3414,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             // The cleanup for this row has already been triggered. For 
example, we are resolving a write intent for an RW transaction
             // and a concurrent RO transaction resolves the same row, hence 
computeIfAbsent.
             return txManager.executeCleanupAsync(() ->
-                    inBusyLock(busyLock, () -> 
storageUpdateHandler.handleTransactionCleanup(txId, txState == COMMITED, 
commitTimestamp))
+                    inBusyLock(busyLock, () -> 
storageUpdateHandler.switchWriteIntents(txId, txState == COMMITED, 
commitTimestamp))
             ).whenComplete((unused, e) -> {
                 if (e != null) {
                     LOG.warn("Failed to complete transaction cleanup command 
[txId=" + txId + ']', e);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
index 2fc1ad4f99..7e08c7c9ce 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
@@ -47,7 +47,7 @@ public class IndexCleanupTest extends IndexBaseTest {
         assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
         assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
 
-        storageUpdateHandler.handleTransactionCleanup(getTxId(), false, null);
+        storageUpdateHandler.switchWriteIntents(getTxId(), false, null);
 
         assertEquals(0, storage.rowsCount());
         assertTrue(pkInnerStorage.allRowsIds().isEmpty());
@@ -94,7 +94,7 @@ public class IndexCleanupTest extends IndexBaseTest {
         writer.addWrite(storageUpdateHandler, rowUuid, row);
         writer.addWrite(storageUpdateHandler, rowUuid, null);
 
-        storageUpdateHandler.handleTransactionCleanup(getTxId(), false, null);
+        storageUpdateHandler.switchWriteIntents(getTxId(), false, null);
 
         assertEquals(0, storage.rowsCount());
         assertTrue(pkInnerStorage.allRowsIds().isEmpty());
@@ -119,7 +119,7 @@ public class IndexCleanupTest extends IndexBaseTest {
 
         writer.addWrite(storageUpdateHandler, rowUuid2, binaryRow(key2, 
value));
 
-        storageUpdateHandler.handleTransactionCleanup(getTxId(), false, null);
+        storageUpdateHandler.switchWriteIntents(getTxId(), false, null);
 
         assertEquals(1, storage.rowsCount());
 
@@ -152,7 +152,7 @@ public class IndexCleanupTest extends IndexBaseTest {
 
         writer.addWrite(storageUpdateHandler, rowUuid, row2);
 
-        storageUpdateHandler.handleTransactionCleanup(getTxId(), false, null);
+        storageUpdateHandler.switchWriteIntents(getTxId(), false, null);
 
         assertEquals(1, storage.rowsCount());
 
@@ -173,7 +173,7 @@ public class IndexCleanupTest extends IndexBaseTest {
 
         writer.addWrite(storageUpdateHandler, rowUuid, null);
 
-        storageUpdateHandler.handleTransactionCleanup(getTxId(), false, null);
+        storageUpdateHandler.switchWriteIntents(getTxId(), false, null);
 
         assertEquals(1, storage.rowsCount());
         assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
@@ -194,7 +194,7 @@ public class IndexCleanupTest extends IndexBaseTest {
 
         writer.addWrite(storageUpdateHandler, rowUuid, row);
 
-        storageUpdateHandler.handleTransactionCleanup(getTxId(), false, null);
+        storageUpdateHandler.switchWriteIntents(getTxId(), false, null);
 
         assertEquals(1, storage.rowsCount());
         assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
index 79ad4870ac..a3c2e5ecf7 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
@@ -167,7 +167,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         assertEquals(3, storage.rowsCount());
 
-        storageUpdateHandler.handleTransactionCleanup(txUuid, false, null);
+        storageUpdateHandler.switchWriteIntents(txUuid, false, null);
 
         assertEquals(0, storage.rowsCount());
     }
@@ -192,7 +192,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         // We have three writes to the storage.
         verify(storage, times(3)).addWrite(any(), any(), any(), anyInt(), 
anyInt());
 
-        storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs);
 
         assertEquals(3, storage.rowsCount());
         // Those writes resulted in three commits.
@@ -202,7 +202,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         clearInvocations(storage);
 
         // And run cleanup again for the same transaction.
-        storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs);
 
         assertEquals(3, storage.rowsCount());
         // And no invocation after, meaning idempotence of the cleanup.
@@ -240,7 +240,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         // We have three writes to the storage.
         verify(storage, times(3)).addWrite(any(), any(), any(), anyInt(), 
anyInt());
 
-        storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs);
 
         assertEquals(3, storage.rowsCount());
         // Those writes resulted in three commits.
@@ -250,7 +250,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         clearInvocations(storage);
 
         // And run cleanup again for the same transaction.
-        storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs);
 
         assertEquals(3, storage.rowsCount());
         // And no invocation after, meaning idempotence of the cleanup.
@@ -279,7 +279,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         verify(storage, times(2)).addWrite(any(), any(), any(), anyInt(), 
anyInt());
 
         // And run cleanup again for the same transaction.
-        storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs);
 
         ReadResult resultAfterDelete1 = storage.read(new 
RowId(partitionId.partitionId(), id1), HybridTimestamp.MAX_VALUE);
         assertEquals(row1, resultAfterDelete1.binaryRow());
@@ -315,7 +315,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         assertEquals(3, storage.rowsCount());
 
         // Now run cleanup.
-        storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs);
 
         // But the loss of the state results in no cleanup, and the entries 
are still write intents.
         verify(storage, never()).commitWrite(any(), any());
@@ -326,7 +326,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         storageUpdateHandler.handleWriteIntentRead(txUuid, new 
RowId(PARTITION_ID, row1Id));
 
         // Run the cleanup.
-        storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs);
 
         // Only the discovered write intent was committed, the other two are 
still write intents.
         verify(storage, times(1)).commitWrite(any(), any());
@@ -346,7 +346,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         clearInvocations(storage);
 
         // Run cleanup for the original transaction
-        storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs);
 
         // Only those two entries will be affected.
         verify(storage, times(2)).commitWrite(any(), any());
@@ -384,7 +384,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         assertEquals(3, storage.rowsCount());
 
         // Now run cleanup.
-        storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs);
 
         // But the loss of the state results in no cleanup, and the entries 
are still write intents.
         verify(storage, never()).commitWrite(any(), any());
@@ -395,7 +395,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         storageUpdateHandler.handleWriteIntentRead(txUuid, new 
RowId(PARTITION_ID, row1Id));
 
         // Run the cleanup.
-        storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs);
 
         // Only the discovered write intent was committed, the other two are 
still write intents.
         verify(storage, times(1)).commitWrite(any(), any());
@@ -420,7 +420,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
         clearInvocations(storage);
 
         // Run cleanup for the original transaction
-        storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs);
 
         // Only those two entries will be affected.
         verify(storage, times(2)).commitWrite(any(), any());
@@ -461,7 +461,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         storageUpdateHandler.handleUpdate(committedTx, rowId, partitionId, 
row1, true, null, null, null);
 
-        storageUpdateHandler.handleTransactionCleanup(committedTx, true, 
commitTs);
+        storageUpdateHandler.switchWriteIntents(committedTx, true, commitTs);
 
         assertEquals(1, storage.rowsCount());
 
@@ -502,7 +502,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         storageUpdateHandler.handleUpdateAll(committedTx, rowsToUpdate, 
partitionId, true, null, null);
 
-        storageUpdateHandler.handleTransactionCleanup(committedTx, true, 
commitTs);
+        storageUpdateHandler.switchWriteIntents(committedTx, true, commitTs);
 
         assertEquals(1, storage.rowsCount());
 
@@ -712,7 +712,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         storageUpdateHandler.handleUpdate(committed1, rowId, partitionId, 
row1, true, null, null, null);
 
-        storageUpdateHandler.handleTransactionCleanup(committed1, true, 
commitTs);
+        storageUpdateHandler.switchWriteIntents(committed1, true, commitTs);
 
         assertEquals(1, storage.rowsCount());
 
@@ -768,7 +768,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         storageUpdateHandler.handleUpdateAll(committed1, rowsToUpdate, 
partitionId, true, null, null);
 
-        storageUpdateHandler.handleTransactionCleanup(committed1, true, 
commitTs);
+        storageUpdateHandler.switchWriteIntents(committed1, true, commitTs);
 
         assertEquals(1, storage.rowsCount());
 
@@ -828,7 +828,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         storageUpdateHandler.handleUpdate(committed1, rowId, partitionId, 
row1, true, null, null, null);
 
-        storageUpdateHandler.handleTransactionCleanup(committed1, true, 
commitTs);
+        storageUpdateHandler.switchWriteIntents(committed1, true, commitTs);
 
         assertEquals(1, storage.rowsCount());
 
@@ -886,7 +886,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         storageUpdateHandler.handleUpdateAll(committed1, rowsToUpdate, 
partitionId, true, null, null);
 
-        storageUpdateHandler.handleTransactionCleanup(committed1, true, 
commitTs);
+        storageUpdateHandler.switchWriteIntents(committed1, true, commitTs);
 
         assertEquals(1, storage.rowsCount());
 
@@ -948,7 +948,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         storageUpdateHandler.handleUpdate(committed1, rowId, partitionId, 
row1, true, null, null, null);
 
-        storageUpdateHandler.handleTransactionCleanup(committed1, true, 
commitTs);
+        storageUpdateHandler.switchWriteIntents(committed1, true, commitTs);
 
         assertEquals(1, storage.rowsCount());
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index 8acabb11f3..ac7b8db3f8 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -196,14 +196,14 @@ public class PartitionRaftCommandsSerializationTest 
extends IgniteAbstractTest {
     public void testTxCleanupCommand() throws Exception {
         HybridClock clock = new HybridClockImpl();
 
-        TxCleanupCommand cmd = msgFactory.txCleanupCommand()
+        WriteIntentSwitchCommand cmd = msgFactory.writeIntentSwitchCommand()
                 .txId(UUID.randomUUID())
                 .commit(true)
                 .commitTimestampLong(clock.nowLong())
                 .txCoordinatorId(UUID.randomUUID().toString())
                 .build();
 
-        TxCleanupCommand readCmd = copyCommand(cmd);
+        WriteIntentSwitchCommand readCmd = copyCommand(cmd);
 
         assertEquals(cmd.txId(), readCmd.txId());
         assertEquals(cmd.commit(), readCmd.commit());
@@ -251,14 +251,14 @@ public class PartitionRaftCommandsSerializationTest 
extends IgniteAbstractTest {
                     .commitTimestampLong(finishTxCommand.commitTimestampLong())
                     .txCoordinatorId(finishTxCommand.txCoordinatorId())
                     .build();
-        } else if (cmd instanceof TxCleanupCommand) {
-            TxCleanupCommand txCleanupCommand = (TxCleanupCommand) cmd;
-
-            return (T) msgFactory.txCleanupCommand()
-                    .txId(txCleanupCommand.txId())
-                    .commit(txCleanupCommand.commit())
-                    
.commitTimestampLong(txCleanupCommand.commitTimestampLong())
-                    .txCoordinatorId(txCleanupCommand.txCoordinatorId())
+        } else if (cmd instanceof WriteIntentSwitchCommand) {
+            WriteIntentSwitchCommand writeIntentSwitchCommand = 
(WriteIntentSwitchCommand) cmd;
+
+            return (T) msgFactory.writeIntentSwitchCommand()
+                    .txId(writeIntentSwitchCommand.txId())
+                    .commit(writeIntentSwitchCommand.commit())
+                    
.commitTimestampLong(writeIntentSwitchCommand.commitTimestampLong())
+                    
.txCoordinatorId(writeIntentSwitchCommand.txCoordinatorId())
                     .build();
         } else if (cmd instanceof UpdateCommand) {
             UpdateCommand updateCommand = (UpdateCommand) cmd;
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 8d2302ecf2..e278eb241f 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -87,8 +87,8 @@ import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import 
org.apache.ignite.internal.table.distributed.command.TimedBinaryRowMessage;
-import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import 
org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
@@ -338,8 +338,8 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
         UpdateCommand updateCommand = mock(UpdateCommand.class);
         when(updateCommand.safeTime()).thenAnswer(v -> hybridClock.now());
 
-        TxCleanupCommand txCleanupCommand = mock(TxCleanupCommand.class);
-        when(txCleanupCommand.safeTime()).thenAnswer(v -> hybridClock.now());
+        WriteIntentSwitchCommand writeIntentSwitchCommand = 
mock(WriteIntentSwitchCommand.class);
+        when(writeIntentSwitchCommand.safeTime()).thenAnswer(v -> 
hybridClock.now());
 
         SafeTimeSyncCommand safeTimeSyncCommand = 
mock(SafeTimeSyncCommand.class);
         when(safeTimeSyncCommand.safeTime()).thenAnswer(v -> 
hybridClock.now());
@@ -351,7 +351,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
         commandListener.onWrite(List.of(
                 writeCommandCommandClosure(3, 1, updateCommand, 
commandClosureResultCaptor),
                 writeCommandCommandClosure(10, 1, updateCommand, 
commandClosureResultCaptor),
-                writeCommandCommandClosure(4, 1, txCleanupCommand, 
commandClosureResultCaptor),
+                writeCommandCommandClosure(4, 1, writeIntentSwitchCommand, 
commandClosureResultCaptor),
                 writeCommandCommandClosure(5, 1, safeTimeSyncCommand, 
commandClosureResultCaptor)
         ).iterator());
 
@@ -629,7 +629,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .txCoordinatorId(UUID.randomUUID().toString())
                 .build());
 
-        invokeBatchedCommand(msgFactory.txCleanupCommand()
+        invokeBatchedCommand(msgFactory.writeIntentSwitchCommand()
                 .txId(txId)
                 .commit(true)
                 .commitTimestampLong(commitTimestamp.longValue())
@@ -672,7 +672,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .txCoordinatorId(UUID.randomUUID().toString())
                 .build());
 
-        invokeBatchedCommand(msgFactory.txCleanupCommand()
+        invokeBatchedCommand(msgFactory.writeIntentSwitchCommand()
                 .txId(txId)
                 .commit(true)
                 .commitTimestampLong(commitTimestamp.longValue())
@@ -710,7 +710,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 .txCoordinatorId(UUID.randomUUID().toString())
                 .build());
 
-        invokeBatchedCommand(msgFactory.txCleanupCommand()
+        invokeBatchedCommand(msgFactory.writeIntentSwitchCommand()
                 .txId(txId)
                 .commit(true)
                 .commitTimestampLong(commitTimestamp.longValue())
@@ -759,7 +759,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
         HybridTimestamp commitTimestamp = hybridClock.now();
 
-        txIds.forEach(txId -> 
invokeBatchedCommand(msgFactory.txCleanupCommand()
+        txIds.forEach(txId -> 
invokeBatchedCommand(msgFactory.writeIntentSwitchCommand()
                 .txId(txId)
                 .commit(true)
                 .commitTimestampLong(commitTimestamp.longValue())
@@ -802,7 +802,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
 
         HybridTimestamp commitTimestamp = hybridClock.now();
 
-        txIds.forEach(txId -> 
invokeBatchedCommand(msgFactory.txCleanupCommand()
+        txIds.forEach(txId -> 
invokeBatchedCommand(msgFactory.writeIntentSwitchCommand()
                 .txId(txId)
                 .commit(true)
                 .commitTimestampLong(commitTimestamp.longValue())
@@ -879,7 +879,7 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
         long commitTimestamp = hybridClock.nowLong();
 
         txIds.forEach(txId -> invokeBatchedCommand(
-                msgFactory.txCleanupCommand()
+                msgFactory.writeIntentSwitchCommand()
                         .txId(txId)
                         .commit(true)
                         .commitTimestampLong(commitTimestamp)
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
index 28fc8e54ea..8fc5b2a2eb 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.replication;
 
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
@@ -24,14 +26,13 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
@@ -115,10 +115,12 @@ public class PartitionReplicaListenerDurableUnlockTest 
extends IgniteAbstractTes
         }).when(txManager).executeCleanupAsync(any(Supplier.class));
 
         doAnswer(invocation -> {
-            UUID txId = invocation.getArgument(2);
-            TablePartitionId partitionId = invocation.getArgument(1);
-            return cleanupCallback.apply(txId, partitionId);
-        }).when(txManager).cleanup(anyString(), any(), any(), anyBoolean(), 
any());
+            UUID txId = invocation.getArgument(3);
+            Collection<TablePartitionId> partitions = 
invocation.getArgument(0);
+            return allOf(partitions.stream()
+                    .map(partitionId -> cleanupCallback.apply(txId, 
partitionId))
+                    .toArray(size -> new CompletableFuture<?>[size]));
+        }).when(txManager).cleanup(any(), anyBoolean(), any(), any());
 
         partitionReplicaListener = new PartitionReplicaListener(
                 new TestMvPartitionStorage(PART_ID),
@@ -187,7 +189,7 @@ public class PartitionReplicaListenerDurableUnlockTest 
extends IgniteAbstractTes
             assertThat(exceptionCounter.get(), greaterThan(0));
 
             if (exceptionCounter.decrementAndGet() > 0) {
-                throw new RuntimeException("test exception");
+                return failedFuture(new RuntimeException("test exception"));
             }
 
             return nullCompletedFuture();
@@ -201,33 +203,4 @@ public class PartitionReplicaListenerDurableUnlockTest 
extends IgniteAbstractTes
 
         assertEquals(0, exceptionCounter.get());
     }
-
-    @Test
-    public void testCantGetPrimaryReplicaForEnlistedPartition() throws 
InterruptedException {
-        UUID tx0 = TestTransactionIds.newTransactionId();
-        TablePartitionId part0 = new TablePartitionId(TABLE_ID, PART_ID);
-        txStateStorage.put(tx0, new TxMeta(TxState.COMMITED, List.of(part0), 
null));
-
-        cleanupCallback = (tx, partId) -> nullCompletedFuture();
-
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = new 
CompletableFuture<>();
-        placementDriver.setAwaitPrimaryReplicaFunction((groupId, timestamp) -> 
primaryReplicaFuture);
-
-        PrimaryReplicaEventParameters parameters = new 
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE.name(), clock.now());
-        
assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
 parameters), willSucceedIn(1, SECONDS));
-
-        assertFalse(txStateStorage.get(tx0).locksReleased());
-
-        Thread primaryReplicaFutureCompleteThread =
-                new Thread(() -> 
primaryReplicaFuture.completeExceptionally(new RuntimeException("test 
exception")));
-        primaryReplicaFutureCompleteThread.start();
-
-        assertFalse(txStateStorage.get(tx0).locksReleased());
-
-        placementDriver.setAwaitPrimaryReplicaFunction(null);
-
-        primaryReplicaFutureCompleteThread.join();
-
-        assertTrue(txStateStorage.get(tx0).locksReleased());
-    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 7842f1d076..5a7cd8b007 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -127,9 +127,9 @@ import 
org.apache.ignite.internal.table.distributed.command.CatalogVersionAware;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import org.apache.ignite.internal.table.distributed.command.PartitionCommand;
 import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
-import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommandImpl;
+import 
org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
@@ -228,10 +228,10 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private final LockManager lockManager = new HeapLockManager();
 
     private final Function<PartitionCommand, CompletableFuture<?>> 
defaultMockRaftFutureClosure = cmd -> {
-        if (cmd instanceof TxCleanupCommand) {
+        if (cmd instanceof WriteIntentSwitchCommand) {
             Set<RowId> rows = pendingRows.remove(cmd.txId());
 
-            HybridTimestamp commitTimestamp = ((TxCleanupCommand) 
cmd).commitTimestamp();
+            HybridTimestamp commitTimestamp = ((WriteIntentSwitchCommand) 
cmd).commitTimestamp();
             assertNotNull(commitTimestamp);
 
             if (rows != null) {
@@ -1303,7 +1303,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             txManager.updateTxMeta(txId, old -> new 
TxStateMeta(TxState.COMMITED, UUID.randomUUID().toString(), commitPartitionId, 
now));
 
             CompletableFuture<?> replicaCleanupFut = 
partitionReplicaListener.invoke(
-                    TX_MESSAGES_FACTORY.txCleanupReplicaRequest()
+                    TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
                             .groupId(grpId)
                             .txId(txId)
                             .commit(true)
@@ -1467,7 +1467,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     private CompletableFuture<?> beginAndAbortTx() {
-        when(txManager.cleanup(any(), any(), any(), anyBoolean(), 
any())).thenReturn(nullCompletedFuture());
+        when(txManager.cleanup(any(), anyBoolean(), any(), 
any())).thenReturn(nullCompletedFuture());
 
         HybridTimestamp beginTimestamp = clock.now();
         UUID txId = transactionIdFor(beginTimestamp);
@@ -1529,7 +1529,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     private CompletableFuture<?> beginAndCommitTx() {
-        when(txManager.cleanup(any(), any(), any(), anyBoolean(), 
any())).thenReturn(nullCompletedFuture());
+        when(txManager.cleanup(any(), anyBoolean(), any(), 
any())).thenReturn(nullCompletedFuture());
 
         HybridTimestamp beginTimestamp = clock.now();
         UUID txId = transactionIdFor(beginTimestamp);
@@ -2247,7 +2247,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .thenReturn(List.of(
                         tableSchema(CURRENT_SCHEMA_VERSION, 
List.of(nullableColumn("col")))
                 ));
-        when(txManager.cleanup(any(), any(), any(), anyBoolean(), 
any())).thenReturn(nullCompletedFuture());
+        when(txManager.cleanup(any(), anyBoolean(), any(), 
any())).thenReturn(nullCompletedFuture());
 
         AtomicReference<Boolean> committed = interceptFinishTxCommand();
 
@@ -2476,7 +2476,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, 
UUID.randomUUID().toString(), commitPartitionId, commitTs));
 
         partitionReplicaListener.invoke(
-                TX_MESSAGES_FACTORY.txCleanupReplicaRequest()
+                TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
                         .groupId(grpId)
                         .txId(txId)
                         .commit(true)
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index ec5112cd8f..e6e87bcccb 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -94,11 +95,13 @@ import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import 
org.apache.ignite.internal.util.PendingIndependentComparableValuesTracker;
+import org.apache.ignite.network.AbstractMessagingService;
+import org.apache.ignite.network.ChannelType;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterNodeImpl;
 import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
@@ -425,7 +428,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
         when(topologyService.localMember()).thenReturn(LOCAL_NODE);
 
         ClusterService clusterService = mock(ClusterService.class);
-        
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
+
+        when(clusterService.messagingService()).thenReturn(new 
DummyMessagingService(LOCAL_NODE.name()));
         when(clusterService.topologyService()).thenReturn(topologyService);
 
         var txManager = new TxManagerImpl(
@@ -485,4 +489,61 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
             }
         };
     }
+
+    /**
+     * Dummy messaging service for tests purposes.
+     * It does not provide any messaging functionality, but allows to trigger 
events.
+     */
+    private static class DummyMessagingService extends 
AbstractMessagingService {
+        private final String localNodeName;
+
+        private final AtomicLong correlationIdGenerator = new AtomicLong();
+
+        DummyMessagingService(String localNodeName) {
+            this.localNodeName = localNodeName;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+            throw new UnsupportedOperationException("Not implemented yet");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<Void> send(ClusterNode recipient, ChannelType 
channelType, NetworkMessage msg) {
+            throw new UnsupportedOperationException("Not implemented yet");
+        }
+
+        @Override
+        public CompletableFuture<Void> send(String recipientConsistentId, 
ChannelType channelType, NetworkMessage msg) {
+            throw new UnsupportedOperationException("Not implemented yet");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<Void> respond(ClusterNode recipient, 
ChannelType type, NetworkMessage msg, long correlationId) {
+            throw new UnsupportedOperationException("Not implemented yet");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<Void> respond(String recipientConsistentId, 
ChannelType type, NetworkMessage msg, long correlationId) {
+            throw new UnsupportedOperationException("Not implemented yet");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, 
ChannelType type, NetworkMessage msg, long timeout) {
+            throw new UnsupportedOperationException("Not implemented yet");
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CompletableFuture<NetworkMessage> invoke(String 
recipientNodeId, ChannelType type, NetworkMessage msg, long timeout) {
+            getMessageHandlers(msg.groupType()).forEach(h -> h.onReceived(msg, 
localNodeName, correlationIdGenerator.getAndIncrement()));
+
+            return nullCompletedFuture();
+        }
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index fc6a98888f..9a6e287120 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.tx;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -129,21 +130,19 @@ public interface TxManager extends IgniteComponent {
     );
 
     /**
-     * Sends cleanup request to the specified primary replica.
+     * Sends cleanup request to the cluster nodes that hosts primary replicas 
for the enlisted partitions.
      *
-     * @param primaryConsistentId  A consistent id of the primary replica node.
-     * @param tablePartitionId Table partition id.
-     * @param txId Transaction id.
-     * @param commit {@code True} if a commit requested.
+     * @param partitions Enlisted partition groups.
+     * @param commit {@code true} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
+     * @param txId Transaction id.
      * @return Completable future of Void.
      */
     CompletableFuture<Void> cleanup(
-            String primaryConsistentId,
-            TablePartitionId tablePartitionId,
-            UUID txId,
+            Collection<TablePartitionId> partitions,
             boolean commit,
-            @Nullable HybridTimestamp commitTimestamp
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId
     );
 
     /**
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
new file mode 100644
index 0000000000..0df5bc3495
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PlacementDriverHelper.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
+import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.tx.TransactionException;
+
+/**
+ * A helper class to retrieve primary replicas with exception handling.
+ */
+public class PlacementDriverHelper {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(PlacementDriverHelper.class);
+
+    private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
+
+    /** Placement driver. */
+    private final PlacementDriver placementDriver;
+
+    /** A hybrid logical clock. */
+    private final HybridClock clock;
+
+    /**
+     * Constructor.
+     *
+     * @param placementDriver Placement driver.
+     * @param clock A hybrid logical clock.
+     */
+    public PlacementDriverHelper(PlacementDriver placementDriver, HybridClock 
clock) {
+        this.placementDriver = placementDriver;
+        this.clock = clock;
+    }
+
+    /**
+     * Wait for primary replica to appear for the provided partition.
+     *
+     * @param partitionId Partition id.
+     * @return Future that completes with node id that is a primary for the 
provided partition, or completes with exception if no primary
+     *         appeared during the await timeout.
+     */
+    public CompletableFuture<ReplicaMeta> 
awaitPrimaryReplicaWithExceptionHandling(TablePartitionId partitionId) {
+        HybridTimestamp timestamp = clock.now();
+
+        return placementDriver.awaitPrimaryReplica(partitionId, timestamp, 
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS)
+                .handle((primaryReplica, e) -> {
+                    if (e != null) {
+                        LOG.debug("Failed to retrieve primary replica for 
partition {}", partitionId, e);
+
+                        throw withCause(TransactionException::new, 
REPLICA_UNAVAILABLE_ERR,
+                                "Failed to get the primary replica"
+                                        + " [tablePartitionId=" + partitionId 
+ ", awaitTimestamp=" + timestamp + ']', e);
+                    }
+
+                    return primaryReplica;
+                });
+    }
+
+    /**
+     * Get primary replicas for the provided partitions.
+     *
+     * @param partitions A collection of partitions.
+     * @return A future that completes with a map of node to the partitions 
the node is primary for and a collection of partitions that we
+     *         failed to find the primary for.
+     */
+    public CompletableFuture<PartitionData> 
findPrimaryReplicas(Collection<TablePartitionId> partitions) {
+        HybridTimestamp timestamp = clock.now();
+
+        Map<TablePartitionId, CompletableFuture<ReplicaMeta>> 
primaryReplicaFutures = new HashMap<>();
+
+        // Please note that we are using `get primary replica` instead of 
`await primary replica`.
+        // This method is faster, yet we still have the correctness:
+        // If the primary replica has not changed, get will return a valid 
value and we'll send an unlock request to this node.
+        // If the primary replica has expired and get returns null (or a 
different node), the primary node step down logic
+        // will automatically release the locks on that node. All we need to 
do is to clean the storage.
+        for (TablePartitionId partitionId : partitions) {
+            primaryReplicaFutures.put(partitionId, 
placementDriver.getPrimaryReplica(partitionId, timestamp));
+        }
+
+        return allOf(primaryReplicaFutures.values().toArray(new 
CompletableFuture<?>[0]))
+                .thenApply(v -> {
+                    Map<String, Set<TablePartitionId>> partitionsByNode = new 
HashMap<>();
+
+                    Set<TablePartitionId> partitionsWithoutPrimary = new 
HashSet<>();
+
+                    for (Entry<TablePartitionId, 
CompletableFuture<ReplicaMeta>> entry : primaryReplicaFutures.entrySet()) {
+                        // Safe to call join, the future has already finished.
+                        ReplicaMeta meta = entry.getValue().join();
+
+                        TablePartitionId partition = entry.getKey();
+
+                        if (meta != null) {
+                            
partitionsByNode.computeIfAbsent(meta.getLeaseholder(), s -> new HashSet<>())
+                                    .add(partition);
+                        } else {
+                            partitionsWithoutPrimary.add(partition);
+                        }
+                    }
+                    return new PartitionData(partitionsByNode, 
partitionsWithoutPrimary);
+                });
+    }
+
+    /**
+     * The result of retrieving primary replicas for a collection of 
partitions.
+     */
+    public static class PartitionData {
+        final Map<String, Set<TablePartitionId>> partitionsByNode;
+
+        final Set<TablePartitionId> partitionsWithoutPrimary;
+
+        PartitionData(Map<String, Set<TablePartitionId>> partitionsByNode, 
Set<TablePartitionId> partitionsWithoutPrimary) {
+            this.partitionsByNode = partitionsByNode;
+            this.partitionsWithoutPrimary = partitionsWithoutPrimary;
+        }
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
new file mode 100644
index 0000000000..6671e68603
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.message.TxCleanupMessage;
+import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles TX Cleanup request ({@link TxCleanupMessage}).
+ */
+public class TxCleanupRequestHandler {
+    /** Tx messages factory. */
+    private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    /** Lock manager. */
+    private final LockManager lockManager;
+
+    /** Hybrid clock. */
+    private final HybridClock hybridClock;
+
+    /** Cleanup processor. */
+    private final WriteIntentSwitchProcessor writeIntentSwitchProcessor;
+
+    /**
+     * The constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param lockManager Lock manager.
+     * @param clock A hybrid logical clock.
+     * @param writeIntentSwitchProcessor A cleanup processor.
+     */
+    public TxCleanupRequestHandler(
+            ClusterService clusterService,
+            LockManager lockManager,
+            HybridClock clock,
+            WriteIntentSwitchProcessor writeIntentSwitchProcessor
+    ) {
+        this.clusterService = clusterService;
+        this.lockManager = lockManager;
+        this.hybridClock = clock;
+        this.writeIntentSwitchProcessor = writeIntentSwitchProcessor;
+    }
+
+    /**
+     * Starts the processor.
+     */
+    public void start() {
+        
clusterService.messagingService().addMessageHandler(TxMessageGroup.class, (msg, 
sender, correlationId) -> {
+            if (msg instanceof TxCleanupMessage) {
+                processTxCleanup((TxCleanupMessage) msg, sender, 
correlationId);
+            }
+        });
+    }
+
+    public void stop() {
+    }
+
+    private void processTxCleanup(TxCleanupMessage txCleanupMessage, String 
senderId, @Nullable Long correlationId) {
+        assert correlationId != null;
+
+        Map<TablePartitionId, CompletableFuture<?>> writeIntentSwitches = new 
HashMap<>();
+
+        // These cleanups will all be local.
+        Collection<ReplicationGroupId> groups = txCleanupMessage.groups();
+
+        if (groups != null) {
+            for (ReplicationGroupId group : groups) {
+                writeIntentSwitches.put((TablePartitionId) group,
+                        writeIntentSwitchProcessor.switchLocalWriteIntents(
+                                (TablePartitionId) group,
+                                txCleanupMessage.txId(),
+                                txCleanupMessage.commit(),
+                                txCleanupMessage.commitTimestamp()
+                        ));
+            }
+        }
+        // First trigger the cleanup to properly release the locks if we know 
all affected partitions on this node.
+        // If the partition collection is empty (likely to be the recovery 
case)- just run 'release locks'.
+        allOf(writeIntentSwitches.values().toArray(new 
CompletableFuture<?>[0]))
+                .whenComplete((unused, ex) -> {
+                    releaseTxLocks(txCleanupMessage.txId());
+
+                    NetworkMessage msg;
+                    if (ex == null) {
+                        msg = prepareResponse();
+                    } else {
+                        msg = prepareErrorResponse(ex);
+
+                        // Run durable cleanup for the partitions that we 
failed to cleanup properly.
+                        // No need to wait on this future.
+                        writeIntentSwitches.forEach((groupId, future) -> {
+                            if (future.isCompletedExceptionally()) {
+                                
writeIntentSwitchProcessor.switchWriteIntentsWithRetry(
+                                        txCleanupMessage.commit(),
+                                        txCleanupMessage.commitTimestamp(),
+                                        txCleanupMessage.txId(),
+                                        groupId
+                                );
+                            }
+                        });
+                    }
+
+                    clusterService.messagingService().respond(senderId, msg, 
correlationId);
+                });
+    }
+
+    private void releaseTxLocks(UUID txId) {
+        lockManager.locks(txId).forEachRemaining(lockManager::release);
+    }
+
+    private NetworkMessage prepareResponse() {
+        return FACTORY
+                .txCleanupMessageResponse()
+                .timestampLong(hybridClock.nowLong())
+                .build();
+    }
+
+    private NetworkMessage prepareErrorResponse(Throwable th) {
+        return FACTORY
+                .txCleanupMessageErrorResponse()
+                .throwable(th)
+                .timestampLong(hybridClock.nowLong())
+                .build();
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
new file mode 100644
index 0000000000..46815c062f
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sends TX Cleanup request.
+ */
+public class TxCleanupRequestSender {
+    /** Placement driver helper. */
+    private final PlacementDriverHelper placementDriverHelper;
+
+    /** Cleanup processor. */
+    private final WriteIntentSwitchProcessor writeIntentSwitchProcessor;
+
+    private final TxMessageSender txMessageSender;
+
+    /**
+     * The constructor.
+     *
+     * @param txMessageSender Message sender.
+     * @param placementDriverHelper Placement driver helper.
+     * @param writeIntentSwitchProcessor A cleanup processor.
+     */
+    public TxCleanupRequestSender(
+            TxMessageSender txMessageSender,
+            PlacementDriverHelper placementDriverHelper,
+            WriteIntentSwitchProcessor writeIntentSwitchProcessor
+    ) {
+        this.txMessageSender = txMessageSender;
+        this.placementDriverHelper = placementDriverHelper;
+        this.writeIntentSwitchProcessor = writeIntentSwitchProcessor;
+    }
+
+    /**
+     * Sends cleanup request to the primary nodes of each one of {@code 
partitions}.
+     *
+     * @param partitions Enlisted partition groups.
+     * @param commit {@code true} if a commit requested.
+     * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
+     * @param txId Transaction id.
+     * @return Completable future of Void.
+     */
+    public CompletableFuture<Void> cleanup(
+            Collection<TablePartitionId> partitions,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId
+    ) {
+        return placementDriverHelper.findPrimaryReplicas(partitions)
+                .thenCompose(partitionData -> {
+                    switchWriteIntentsOnPartitions(commit, commitTimestamp, 
txId, partitionData.partitionsWithoutPrimary);
+
+                    return cleanupPartitions(partitionData.partitionsByNode, 
commit, commitTimestamp, txId);
+                });
+    }
+
+    private void switchWriteIntentsOnPartitions(
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId,
+            Set<TablePartitionId> noPrimaryFound
+    ) {
+        for (TablePartitionId partition : noPrimaryFound) {
+            // Okay, no primary found for that partition.
+            // Means the old one is no longer primary thus the locks were 
released.
+            // All we need to do is to wait for the new primary to appear and 
cleanup write intents.
+            writeIntentSwitchProcessor.switchWriteIntentsWithRetry(commit, 
commitTimestamp, txId, partition);
+        }
+    }
+
+    private CompletableFuture<Void> cleanupPartitions(
+            Map<String, Set<TablePartitionId>> partitionsByNode,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId
+    ) {
+        List<CompletableFuture<Void>> cleanupFutures = new ArrayList<>();
+
+        for (Entry<String, Set<TablePartitionId>> entry : 
partitionsByNode.entrySet()) {
+            String node = entry.getKey();
+            Set<TablePartitionId> nodePartitions = entry.getValue();
+
+            cleanupFutures.add(sendCleanupMessageWithRetries(commit, 
commitTimestamp, txId, node, nodePartitions));
+        }
+
+        return allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]));
+    }
+
+    private CompletableFuture<Void> sendCleanupMessageWithRetries(
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId,
+            String node,
+            Collection<TablePartitionId> partitions
+    ) {
+        Collection<ReplicationGroupId> enlistedPartitions = 
(Collection<ReplicationGroupId>) (Collection<?>) partitions;
+
+        return txMessageSender.cleanup(node, enlistedPartitions, txId, commit, 
commitTimestamp)
+                .handle((networkMessage, throwable) -> {
+                    if (throwable != null) {
+                        if 
(TransactionFailureHandler.isRecoverable(throwable)) {
+                            // In the case of a failure we repeat the process, 
but start with finding correct primary replicas
+                            // for this subset of partitions. If nothing 
changed in terms of the nodes and primaries
+                            // we eventually will call ourselves with the same 
parameters.
+                            // On the other hand (for example if this node has 
died) we will
+                            //  either have a new mapping of primary to its 
partitions
+                            // or will run `switchWriteIntentsOnPartitions` 
for partitions with no primary.
+                            // At the end of the day all write intents will be 
properly converted.
+                            return cleanup(partitions, commit, 
commitTimestamp, txId);
+                        }
+
+                        return CompletableFuture.<Void>failedFuture(throwable);
+                    }
+
+                    return CompletableFutures.<Void>nullCompletedFuture();
+                })
+                .thenCompose(v -> v);
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 97f0172826..507a7e4e41 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -20,19 +20,15 @@ package org.apache.ignite.internal.tx.impl;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITED;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
 import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
-import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_WAS_ABORTED_ERR;
 
@@ -65,7 +61,6 @@ import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -84,8 +79,6 @@ import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.TxStateMetaFinishing;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
-import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
-import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -107,16 +100,9 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     /** Hint for maximum concurrent txns. */
     private static final int MAX_CONCURRENT_TXNS = 1024;
 
-    private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
-
-    /** Tx messages factory. */
-    private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
-
     /** Transaction configuration. */
     private final TransactionConfiguration txConfig;
 
-    private final ReplicaService replicaService;
-
     /** Lock manager. */
     private final LockManager lockManager;
 
@@ -148,6 +134,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
     private final PlacementDriver placementDriver;
 
+    private final PlacementDriverHelper placementDriverHelper;
+
     private final LongSupplier idleSafeTimePropagationPeriodMsSupplier;
 
     /** Prevents double stopping of the tracker. */
@@ -165,6 +153,21 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     /** Local node network identity. This id is available only after the 
network has started. */
     private String localNodeId;
 
+    /**
+     * Server cleanup processor.
+     */
+    private final TxCleanupRequestHandler txCleanupRequestHandler;
+
+    /**
+     * Cleanup request sender.
+     */
+    private final TxCleanupRequestSender txCleanupRequestSender;
+
+    /**
+     * Transaction message sender.
+     */
+    private final TxMessageSender txMessageSender;
+
     /**
      * The constructor.
      *
@@ -188,7 +191,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             LongSupplier idleSafeTimePropagationPeriodMsSupplier
     ) {
         this.txConfig = txConfig;
-        this.replicaService = replicaService;
         this.lockManager = lockManager;
         this.clock = clock;
         this.transactionIdGenerator = transactionIdGenerator;
@@ -196,6 +198,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         this.placementDriver = placementDriver;
         this.idleSafeTimePropagationPeriodMsSupplier = 
idleSafeTimePropagationPeriodMsSupplier;
 
+        placementDriverHelper = new PlacementDriverHelper(placementDriver, 
clock);
+
         int cpus = Runtime.getRuntime().availableProcessors();
 
         cleanupExecutor = new ThreadPoolExecutor(
@@ -207,6 +211,15 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                 new NamedThreadFactory("tx-async-cleanup", LOG));
 
         orphanDetector = new OrphanDetector(clusterService.topologyService(), 
replicaService, placementDriver, lockManager, clock);
+
+        txMessageSender = new TxMessageSender(clusterService, replicaService, 
clock);
+
+        WriteIntentSwitchProcessor writeIntentSwitchProcessor =
+                new WriteIntentSwitchProcessor(placementDriverHelper, 
txMessageSender, clusterService);
+
+        txCleanupRequestHandler = new TxCleanupRequestHandler(clusterService, 
lockManager, clock, writeIntentSwitchProcessor);
+
+        txCleanupRequestSender = new TxCleanupRequestSender(txMessageSender, 
placementDriverHelper, writeIntentSwitchProcessor);
     }
 
     @Override
@@ -281,7 +294,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     }
 
     @Override
-    public TxStateMeta updateTxMeta(UUID txId, Function<TxStateMeta, 
TxStateMeta> updater) {
+    public @Nullable TxStateMeta updateTxMeta(UUID txId, Function<TxStateMeta, 
TxStateMeta> updater) {
         return inBusyLock(busyLock, () -> 
txStateVolatileStorage.updateMeta(txId, oldMeta -> {
             TxStateMeta newMeta = updater.apply(oldMeta);
 
@@ -410,7 +423,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             HybridTimestamp commitTimestamp,
             CompletableFuture<TransactionMeta> txFinishFuture
     ) {
-        return inBusyLockAsync(busyLock, () -> 
findPrimaryReplica(commitPartition, clock.now())
+        return inBusyLockAsync(busyLock, () -> 
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartition)
                 .thenCompose(meta ->
                         makeFinishRequest(
                                 observableTimestampTracker,
@@ -481,18 +494,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         LOG.debug("Finish [partition={}, node={}, term={} commit={}, txId={}, 
groups={}",
                 commitPartition, primaryConsistentId, term, commit, txId, 
replicationGroupIds);
 
-        TxFinishReplicaRequest req = FACTORY.txFinishReplicaRequest()
-                .txId(txId)
-                .timestampLong(clock.nowLong())
-                .groupId(commitPartition)
-                .groups(replicationGroupIds)
-                // In case of verification future failure transaction will be 
rolled back.
-                .commit(commit)
-                .commitTimestampLong(hybridTimestampToLong(commitTimestamp))
-                .enlistmentConsistencyToken(term)
-                .build();
-
-        return replicaService.invoke(primaryConsistentId, req)
+        return txMessageSender.finish(primaryConsistentId, commitPartition, 
replicationGroupIds, txId, term, commit, commitTimestamp)
                 .thenRun(() -> {
                     updateTxMeta(txId, old -> {
                         if (isFinalState(old.txState())) {
@@ -520,41 +522,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                 });
     }
 
-    private CompletableFuture<ReplicaMeta> findPrimaryReplica(TablePartitionId 
partitionId, HybridTimestamp now) {
-        return placementDriver.awaitPrimaryReplica(partitionId, now, 
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS)
-                .handle((primaryReplica, e) -> {
-                    if (e != null) {
-                        LOG.error("Failed to retrieve primary replica for 
partition {}", e, partitionId);
-
-                        throw withCause(TransactionException::new, 
REPLICA_UNAVAILABLE_ERR,
-                                "Failed to get the primary replica"
-                                        + " [tablePartitionId=" + partitionId 
+ ", awaitTimestamp=" + now + ']', e);
-                    }
-
-                    return primaryReplica;
-                });
-    }
-
-    @Override
-    public CompletableFuture<Void> cleanup(
-            String primaryConsistentId,
-            TablePartitionId tablePartitionId,
-            UUID txId,
-            boolean commit,
-            @Nullable HybridTimestamp commitTimestamp
-    ) {
-        return replicaService.invoke(
-                primaryConsistentId,
-                FACTORY.txCleanupReplicaRequest()
-                        .groupId(tablePartitionId)
-                        .timestampLong(clock.nowLong())
-                        .txId(txId)
-                        .commit(commit)
-                        
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
-                        .build()
-        );
-    }
-
     @Override
     public int finished() {
         return inBusyLock(busyLock, () -> (int) 
txStateVolatileStorage.states().stream()
@@ -577,6 +544,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         txStateVolatileStorage.start();
 
         orphanDetector.start(txStateVolatileStorage, 
txConfig.abandonedCheckTs());
+
+        txCleanupRequestHandler.start();
     }
 
     @Override
@@ -586,13 +555,15 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     }
 
     @Override
-    public void stop() {
+    public void stop() throws Exception {
         if (!stopGuard.compareAndSet(false, true)) {
             return;
         }
 
         busyLock.block();
 
+        txCleanupRequestHandler.stop();
+
         shutdownAndAwaitTermination(cleanupExecutor, 10, TimeUnit.SECONDS);
     }
 
@@ -601,6 +572,16 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         return lockManager;
     }
 
+    @Override
+    public CompletableFuture<Void> cleanup(
+            Collection<TablePartitionId> partitions,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId
+    ) {
+        return txCleanupRequestSender.cleanup(partitions, commit, 
commitTimestamp, txId);
+    }
+
     @Override
     public CompletableFuture<Void> executeCleanupAsync(Runnable runnable) {
         return runAsync(runnable, cleanupExecutor);
@@ -808,7 +789,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         }
     }
 
-    private static class TransactionFailureHandler {
+    static class TransactionFailureHandler {
         private static final Set<Class<? extends Throwable>> RECOVERABLE = 
Set.of(
                 TimeoutException.class,
                 IOException.class,
@@ -839,4 +820,3 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         }
     }
 }
-
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
new file mode 100644
index 0000000000..3016c3dd96
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This class is responsible for interacting with the messaging layer. Sends 
transaction messages.
+ */
+public class TxMessageSender {
+
+    private static final long RPC_TIMEOUT = 3000;
+
+    /** Tx messages factory. */
+    private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    /** Replica service. */
+    private final ReplicaService replicaService;
+
+    /** A hybrid logical clock. */
+    private final HybridClock clock;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param replicaService Replica service.
+     * @param clock A hybrid logical clock.
+     */
+    public TxMessageSender(ClusterService clusterService, ReplicaService 
replicaService, HybridClock clock) {
+        this.clusterService = clusterService;
+        this.replicaService = replicaService;
+        this.clock = clock;
+    }
+
+    /**
+     * Sends WriteIntentSwitch request to the specified primary replica.
+     *
+     * @param primaryConsistentId Primary replica to process given cleanup 
request.
+     * @param tablePartitionId Table partition id.
+     * @param txId Transaction id.
+     * @param commit {@code True} if a commit requested.
+     * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
+     * @return Completable future of Void.
+     */
+    public CompletableFuture<Void> switchWriteIntents(
+            String primaryConsistentId,
+            TablePartitionId tablePartitionId,
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        return replicaService.invoke(
+                primaryConsistentId,
+                FACTORY.writeIntentSwitchReplicaRequest()
+                        .groupId(tablePartitionId)
+                        .timestampLong(clock.nowLong())
+                        .txId(txId)
+                        .commit(commit)
+                        
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
+                        .build()
+        );
+    }
+
+    /**
+     * Sends cleanup request to the specified primary replica.
+     *
+     * @param primaryConsistentId Primary replica to process given cleanup 
request.
+     * @param replicationGroupIds Table partition ids.
+     * @param txId Transaction id.
+     * @param commit {@code True} if a commit requested.
+     * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
+     * @return Completable future of Void.
+     */
+    public CompletableFuture<NetworkMessage> cleanup(
+            String primaryConsistentId,
+            Collection<ReplicationGroupId> replicationGroupIds,
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        return clusterService.messagingService().invoke(
+                primaryConsistentId,
+                FACTORY.txCleanupMessage()
+                        .txId(txId)
+                        .commit(commit)
+                        
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
+                        .timestampLong(clock.nowLong())
+                        .groups(replicationGroupIds)
+                        .build(),
+                RPC_TIMEOUT);
+    }
+
+    /**
+     * Send a transactions finish request.
+     *
+     * @param primaryConsistentId Node id to send the request to.
+     * @param commitPartition Partition to store a transaction state.
+     * @param replicationGroupIds Enlisted partition groups.
+     * @param txId Transaction id.
+     * @param term Raft term.
+     * @param commit {@code true} if a commit requested.
+     * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
+     * @return Completable future of Void.
+     */
+    public CompletableFuture<Void> finish(
+            String primaryConsistentId,
+            TablePartitionId commitPartition,
+            Collection<ReplicationGroupId> replicationGroupIds,
+            UUID txId,
+            Long term,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        return replicaService.invoke(
+                primaryConsistentId,
+                FACTORY.txFinishReplicaRequest()
+                        .txId(txId)
+                        .timestampLong(clock.nowLong())
+                        .groupId(commitPartition)
+                        .groups(replicationGroupIds)
+                        .commit(commit)
+                        
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
+                        .enlistmentConsistencyToken(term)
+                        .build());
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
new file mode 100644
index 0000000000..eccd5d058d
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WriteIntentSwitchProcessor.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.network.ClusterService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sends requests to switch write intents (to normal value for a commit and 
remove for an abort).
+ */
+public class WriteIntentSwitchProcessor {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(WriteIntentSwitchProcessor.class);
+
+    private static final int ATTEMPTS_TO_SWITCH_WI = 5;
+
+    /** Placement driver helper. */
+    private final PlacementDriverHelper placementDriverHelper;
+
+    private final TxMessageSender txMessageSender;
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    /**
+     * The constructor.
+     *
+     * @param placementDriverHelper Placement driver helper.
+     * @param txMessageSender Transaction message creator.
+     * @param clusterService Cluster service.
+     */
+    public WriteIntentSwitchProcessor(
+            PlacementDriverHelper placementDriverHelper,
+            TxMessageSender txMessageSender,
+            ClusterService clusterService
+    ) {
+        this.placementDriverHelper = placementDriverHelper;
+        this.txMessageSender = txMessageSender;
+        this.clusterService = clusterService;
+    }
+
+    /**
+     * Run switch write intent on the provided node.
+     */
+    public CompletableFuture<Void> switchLocalWriteIntents(
+            TablePartitionId tablePartitionId,
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        String localNodeName = 
clusterService.topologyService().localMember().name();
+
+        return txMessageSender.switchWriteIntents(localNodeName, 
tablePartitionId, txId, commit, commitTimestamp);
+    }
+
+    /**
+     * Run switch write intent on the primary node of the provided partition 
in a durable manner.
+     */
+    public CompletableFuture<Void> switchWriteIntentsWithRetry(
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId,
+            TablePartitionId partitionId
+    ) {
+        return switchWriteIntentsWithRetry(commit, commitTimestamp, txId, 
partitionId, ATTEMPTS_TO_SWITCH_WI);
+    }
+
+    // TODO https://issues.apache.org/jira/browse/IGNITE-20681 remove attempts 
count.
+    private CompletableFuture<Void> switchWriteIntentsWithRetry(
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId,
+            TablePartitionId partitionId,
+            int attempts
+    ) {
+        return 
placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(partitionId)
+                .thenCompose(leaseHolder ->
+                        
txMessageSender.switchWriteIntents(leaseHolder.getLeaseholder(), partitionId, 
txId, commit, commitTimestamp))
+                .handle((res, ex) -> {
+                    if (ex != null) {
+                        if (attempts > 0) {
+                            LOG.warn("Failed to switch write intents for Tx. 
The operation will be retried [txId={}].", txId, ex);
+                        } else {
+                            LOG.warn("Failed to switch write intents for Tx 
[txId={}].", txId, ex);
+                        }
+
+                        if (attempts > 0) {
+                            return switchWriteIntentsWithRetry(commit, 
commitTimestamp, txId, partitionId, attempts - 1);
+                        }
+
+                        return CompletableFuture.<Void>failedFuture(ex);
+                    }
+
+                    return CompletableFutures.<Void>nullCompletedFuture();
+                })
+                .thenCompose(Function.identity());
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java
similarity index 73%
copy from 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java
copy to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java
index 2032cf1352..774b77ac6f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java
@@ -19,30 +19,37 @@ package org.apache.ignite.internal.tx.message;
 
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
 
+import java.util.Collection;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.annotations.Transferable;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Transaction cleanup replica request that will trigger following actions 
processing.
- *
- *  <ol>
- *      <li>Convert all pending entries(writeIntents) to either regular 
values(TxState.COMMITED) or remove them (TxState.ABORTED).</li>
- *      <li>Release all locks that were held on local replica by given 
transaction.</li>
- *  </ol>
+ * Cleanup transaction message.
  */
-@Transferable(TxMessageGroup.TX_CLEANUP_REQUEST)
-public interface TxCleanupReplicaRequest extends ReplicaRequest, 
TimestampAware {
+@Transferable(TxMessageGroup.TX_CLEANUP_MSG)
+public interface TxCleanupMessage extends TimestampAware {
     /**
-     * Returns transaction Id.
+     * Gets a transaction id to resolve.
      *
      * @return Transaction id.
      */
     UUID txId();
 
+    /**
+     * Returns replication groups aggregated by expected primary replica nodes.
+     * Null when this message is sent at recovery.
+     *
+     * @return Replication groups aggregated by expected primary replica nodes.
+     */
+    @Marshallable
+    @Nullable
+    Collection<ReplicationGroupId> groups();
+
     /**
      * Returns {@code True} if a commit request.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
new file mode 100644
index 0000000000..0e09ab6433
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Cleanup transaction message error response.
+ */
+@Transferable(TxMessageGroup.TX_CLEANUP_MSG_ERR_RESPONSE)
+public interface TxCleanupMessageErrorResponse extends 
TxCleanupMessageResponse {
+    /**
+     * Returns a {@link Throwable} that was thrown during handling a lock 
release message.
+     *
+     * @return {@link Throwable} that was thrown during handling a lock 
release message.
+     */
+    @Marshallable
+    Throwable throwable();
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
new file mode 100644
index 0000000000..123391e0c7
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Cleanup transaction message response.
+ */
+@Transferable(TxMessageGroup.TX_CLEANUP_MSG_RESPONSE)
+public interface TxCleanupMessageResponse extends TimestampAware {
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index 2aed7d0c01..3e22292a2a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -35,9 +35,9 @@ public class TxMessageGroup {
     public static final short TX_FINISH_RESPONSE = 1;
 
     /**
-     * Message type for {@link TxCleanupReplicaRequest}.
+     * Message type for {@link WriteIntentSwitchReplicaRequest}.
      */
-    public static final short TX_CLEANUP_REQUEST = 2;
+    public static final short WRITE_INTENT_SWITCH_REQUEST = 2;
 
     /**
      * Message type for {@link TxStateCommitPartitionRequest}.
@@ -58,4 +58,19 @@ public class TxMessageGroup {
      * Message type for {@link TxRecoveryMessage}.
      */
     public static final short TX_RECOVERY_MSG = 6;
+
+    /**
+     * Message type for {@link TxCleanupMessage}.
+     */
+    public static final short TX_CLEANUP_MSG = 7;
+
+    /**
+     * Message type for {@link TxCleanupMessageResponse}.
+     */
+    public static final short TX_CLEANUP_MSG_RESPONSE = 8;
+
+    /**
+     * Message type for {@link TxCleanupMessageErrorResponse}.
+     */
+    public static final short TX_CLEANUP_MSG_ERR_RESPONSE = 9;
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicaRequest.java
similarity index 80%
rename from 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java
rename to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicaRequest.java
index 2032cf1352..d4417255ff 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/WriteIntentSwitchReplicaRequest.java
@@ -27,15 +27,11 @@ import org.apache.ignite.network.annotations.Transferable;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Transaction cleanup replica request that will trigger following actions 
processing.
- *
- *  <ol>
- *      <li>Convert all pending entries(writeIntents) to either regular 
values(TxState.COMMITED) or remove them (TxState.ABORTED).</li>
- *      <li>Release all locks that were held on local replica by given 
transaction.</li>
- *  </ol>
+ * A replica request that either triggers the conversion of all pending 
entries(writeIntents) to regular values(TxState.COMMITED)
+ * or removes them (TxState.ABORTED).
  */
-@Transferable(TxMessageGroup.TX_CLEANUP_REQUEST)
-public interface TxCleanupReplicaRequest extends ReplicaRequest, 
TimestampAware {
+@Transferable(TxMessageGroup.WRITE_INTENT_SWITCH_REQUEST)
+public interface WriteIntentSwitchReplicaRequest extends ReplicaRequest, 
TimestampAware {
     /**
      * Returns transaction Id.
      *
diff --git a/modules/transactions/tech-notes/cleanup.puml 
b/modules/transactions/tech-notes/cleanup.puml
new file mode 100644
index 0000000000..cf0f386c2f
--- /dev/null
+++ b/modules/transactions/tech-notes/cleanup.puml
@@ -0,0 +1,51 @@
+@startuml
+
+!pragma teoz true
+box "Commit partition" #LightGreen
+participant PartitionReplicaListener
+box "TxManager"
+participant TxCleanupRequestSender
+participant WriteIntentSwitchProcessor as WISP1
+end box
+participant PlacementDriver
+
+end box
+
+box "Partition primary #i" #LightBlue
+box "TxManager"
+participant TxCleanupRequestHandler
+participant WriteIntentSwitchProcessor as WISP2
+end box
+participant PartitionReplicaListener as PRL2
+participant LockManager
+end box
+
+box "New primary for a partition\n (old one stepped down)" #LightGray
+participant PartitionReplicaListener as PRL3
+end box
+rnote over PartitionReplicaListener
+ Finish transaction
+ (out of the scope)
+endrnote
+PartitionReplicaListener -> TxCleanupRequestSender: cleanup
+
+TxCleanupRequestSender -> PlacementDriver: For each enlisted partition\n do 
'getPrimaryReplica'
+return
+
+alt Executes for each available primary node
+
+TxCleanupRequestSender -> TxCleanupRequestHandler: send \ntxCleanupMessage
+TxCleanupRequestHandler -> WISP2: switchWriteIntent
+WISP2 -> PRL2: send \nwriteIntentSwitchReplicaRequest
+return
+WISP2 --> TxCleanupRequestHandler
+TxCleanupRequestHandler->LockManager: releaseTxLocks
+return
+TxCleanupRequestHandler --> TxCleanupRequestSender
+else no primary found
+TxCleanupRequestSender -> WISP1: SwitchWriteIntentWithRetry
+WISP1 -> PRL3: send \nwriteIntentSwitchReplicaRequest
+end
+TxCleanupRequestSender --> PartitionReplicaListener
+PartitionReplicaListener -> PartitionReplicaListener : markLocksReleased
+@enduml

Reply via email to