This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a4ea716e72 IGNITE-22631 Get rid of @Marshallable in 
org.apache.ignite.internal.tx.message (#4026)
a4ea716e72 is described below

commit a4ea716e7255bff749ebce11b2ae70eb4aef4216
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Jul 2 18:17:00 2024 +0300

    IGNITE-22631 Get rid of @Marshallable in 
org.apache.ignite.internal.tx.message (#4026)
---
 .../network/PartitionReplicationMessageGroup.java  |  4 --
 .../network/PartitionReplicationMessageUtils.java  | 57 ----------------
 .../network/raft/SnapshotTxDataResponse.java       |  1 +
 .../ignite/internal/table/ItDurableFinishTest.java |  4 +-
 .../raft/snapshot/outgoing/OutgoingSnapshot.java   |  8 ++-
 .../replicator/PartitionReplicaListener.java       | 13 +++-
 .../replicator/TransactionStateResolver.java       | 23 +++++--
 .../incoming/IncomingSnapshotCopierTest.java       |  8 ++-
 .../replication/PartitionReplicaListenerTest.java  | 34 ++++++----
 .../apache/ignite/internal/tx/TransactionMeta.java |  6 ++
 .../java/org/apache/ignite/internal/tx/TxMeta.java | 25 ++++++-
 .../org/apache/ignite/internal/tx/TxStateMeta.java | 33 +++++++--
 .../ignite/internal/tx/TxStateMetaAbandoned.java   | 27 +++++++-
 .../ignite/internal/tx/TxStateMetaFinishing.java   | 23 +++++++
 .../internal/tx/impl/TxCleanupRequestHandler.java  | 79 +++++++++++++++-------
 .../internal/tx/impl/TxCleanupRequestSender.java   | 10 ++-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  7 +-
 .../ignite/internal/tx/impl/TxMessageSender.java   | 57 ++++++++++++----
 .../tx/message/CleanupReplicatedInfoMessage.java   | 48 +++++++++++++
 .../tx/message/TransactionMetaMessage.java}        | 31 ++-------
 .../internal/tx/message/TxCleanupMessage.java      |  9 +--
 .../tx/message/TxCleanupMessageResponse.java       |  7 +-
 .../tx/message/TxFinishReplicaRequest.java         | 12 +---
 .../ignite/internal/tx/message/TxMessageGroup.java | 14 ++++
 .../ignite/internal/tx/message}/TxMetaMessage.java | 39 +++--------
 ...ponse.java => TxStateMetaAbandonedMessage.java} | 33 ++++++---
 ...ponse.java => TxStateMetaFinishingMessage.java} | 30 +++++---
 .../internal/tx/message/TxStateMetaMessage.java    | 59 ++++++++++++++++
 .../internal/tx/message/TxStateResponse.java       |  7 +-
 29 files changed, 460 insertions(+), 248 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 7ee9efef57..05ef537b46 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -38,7 +38,6 @@ import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDa
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse.ResponseEntry;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataResponse;
-import 
org.apache.ignite.internal.partition.replicator.network.raft.TxMetaMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
@@ -186,9 +185,6 @@ public interface PartitionReplicationMessageGroup {
      */
     short TIMED_BINARY_ROW_MESSAGE = 24;
 
-    /** Message type for {@link TxMetaMessage}. */
-    short TX_META_MESSAGE = 25;
-
     /**
      * Message types for partition replicator module RAFT commands.
      *
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageUtils.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageUtils.java
deleted file mode 100644
index d4f8c799ba..0000000000
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.partition.replicator.network;
-
-import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
-import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
-
-import java.util.ArrayList;
-import 
org.apache.ignite.internal.partition.replicator.network.raft.TxMetaMessage;
-import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
-import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
-import org.apache.ignite.internal.tx.TxMeta;
-
-/** Class that can contain useful constants and methods for working with 
messages from {@link PartitionReplicationMessageGroup}. */
-public class PartitionReplicationMessageUtils {
-    /**
-     * Converts to a network message.
-     *
-     * @param partitionReplicationMessagesFactory Partition replication 
messages factory.
-     * @param replicaMessagesFactory Replica messages factory.
-     * @param txMeta Transaction meta.
-     * @return New instance of network message.
-     */
-    public static TxMetaMessage toTxMetaMessage(
-            PartitionReplicationMessagesFactory 
partitionReplicationMessagesFactory,
-            ReplicaMessagesFactory replicaMessagesFactory,
-            TxMeta txMeta
-    ) {
-        var enlistedPartitionMessages = new 
ArrayList<TablePartitionIdMessage>(txMeta.enlistedPartitions().size());
-
-        for (TablePartitionId enlistedPartition : txMeta.enlistedPartitions()) 
{
-            
enlistedPartitionMessages.add(toTablePartitionIdMessage(replicaMessagesFactory, 
enlistedPartition));
-        }
-
-        return partitionReplicationMessagesFactory.txMetaMessage()
-                .txStateInt(txMeta.txState().ordinal())
-                
.commitTimestampLong(hybridTimestampToLong(txMeta.commitTimestamp()))
-                .enlistedPartitions(enlistedPartitionMessages)
-                .build();
-    }
-}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/SnapshotTxDataResponse.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/SnapshotTxDataResponse.java
index 96ee5f5a6a..ece9048541 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/SnapshotTxDataResponse.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/SnapshotTxDataResponse.java
@@ -22,6 +22,7 @@ import java.util.UUID;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.tx.message.TxMetaMessage;
 
 /**
  * Snapshot TX state partition data response message.
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 66c3f45e55..4055bcba2d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -54,7 +54,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxStateMeta;
-import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
+import org.apache.ignite.internal.tx.message.CleanupReplicatedInfoMessage;
 import org.apache.ignite.internal.tx.message.TxCleanupMessage;
 import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
 import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
@@ -350,7 +350,7 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
                     return false;
                 }
 
-                CleanupReplicatedInfo result = message.result();
+                CleanupReplicatedInfoMessage result = message.result();
 
                 if (result != null) {
                     cleanupReplicatedFuture.complete(null);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
index f7751ddb0b..7244c4eb2c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
 
-import static 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageUtils.toTxMetaMessage;
 import static 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotMetaUtils.collectNextRowIdToBuildIndexes;
 import static 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotMetaUtils.snapshotMetaAt;
 
@@ -42,7 +41,6 @@ import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDa
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse.ResponseEntry;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataResponse;
-import 
org.apache.ignite.internal.partition.replicator.network.raft.TxMetaMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -52,6 +50,8 @@ import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
 import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxMetaMessage;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
@@ -70,6 +70,8 @@ public class OutgoingSnapshot {
 
     private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
 
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
+
     private final UUID id;
 
     private final PartitionAccess partition;
@@ -419,7 +421,7 @@ public class OutgoingSnapshot {
 
         for (IgniteBiTuple<UUID, TxMeta> row : rows) {
             txIds.add(row.getKey());
-            
txMetas.add(toTxMetaMessage(PARTITION_REPLICATION_MESSAGES_FACTORY, 
REPLICA_MESSAGES_FACTORY, row.getValue()));
+            
txMetas.add(row.getValue().toTransactionMetaMessage(REPLICA_MESSAGES_FACTORY, 
TX_MESSAGES_FACTORY));
         }
 
         return PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotTxDataResponse()
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 2b34bff597..8954ff184c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -70,6 +70,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -1624,7 +1625,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      */
     private CompletableFuture<TransactionResult> 
processTxFinishAction(TxFinishReplicaRequest request) {
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use 
ZonePartitionIdMessage and remove cast
-        Map<TablePartitionId, String> enlistedGroups = (Map<TablePartitionId, 
String>) (Map<?, ?>) request.groups();
+        Map<TablePartitionId, String> enlistedGroups = 
asTablePartitionIdStringMap(request.groups());
 
         UUID txId = request.txId();
 
@@ -4169,4 +4170,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return result;
         }
     }
+
+    private static Map<TablePartitionId, String> 
asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
+        var result = new HashMap<TablePartitionId, String>(messages.size());
+
+        for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) {
+            result.put(e.getKey().asTablePartitionId(), e.getValue());
+        }
+
+        return result;
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
index 86b57b7517..97f82b19d3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxState;
@@ -42,6 +43,7 @@ import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.TxStateMetaFinishing;
 import org.apache.ignite.internal.tx.impl.PlacementDriverHelper;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
@@ -53,7 +55,10 @@ import org.jetbrains.annotations.Nullable;
  */
 public class TransactionStateResolver {
     /** Tx messages factory. */
-    private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
+
+    /** Replica messages factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
 
     // TODO https://issues.apache.org/jira/browse/IGNITE-20408 after this 
ticket this resolver will be no longer needed, as
     // TODO we will store coordinator as ClusterNode in local tx state map.
@@ -110,8 +115,8 @@ public class TransactionStateResolver {
 
                 processTxStateRequest(req)
                         .thenAccept(txStateMeta -> {
-                            NetworkMessage response = FACTORY.txStateResponse()
-                                    .txStateMeta(txStateMeta)
+                            NetworkMessage response = 
TX_MESSAGES_FACTORY.txStateResponse()
+                                    
.txStateMeta(toTransactionMetaMessage(txStateMeta))
                                     .timestampLong(clockService.nowLong())
                                     .build();
 
@@ -218,7 +223,7 @@ public class TransactionStateResolver {
             txMessageSender.resolveTxStateFromCoordinator(coordinator.name(), 
txId, timestamp)
                     .whenComplete((response, e) -> {
                         if (e == null) {
-                            txMetaFuture.complete(response.txStateMeta());
+                            
txMetaFuture.complete(asTransactionMeta(response.txStateMeta()));
                         } else {
                             resolveTxStateFromCommitPartition(txId, 
commitGrpId, txMetaFuture);
                         }
@@ -291,7 +296,7 @@ public class TransactionStateResolver {
      * @param request Request.
      * @return Future that should be completed with transaction state meta.
      */
-    private CompletableFuture<TransactionMeta> 
processTxStateRequest(TxStateCoordinatorRequest request) {
+    private CompletableFuture<@Nullable TransactionMeta> 
processTxStateRequest(TxStateCoordinatorRequest request) {
         clockService.updateClock(request.readTimestamp());
 
         UUID txId = request.txId();
@@ -308,4 +313,12 @@ public class TransactionStateResolver {
             return completedFuture(txStateMeta);
         }
     }
+
+    private static @Nullable TransactionMetaMessage 
toTransactionMetaMessage(@Nullable TransactionMeta transactionMeta) {
+        return transactionMeta == null ? null : 
transactionMeta.toTransactionMetaMessage(REPLICA_MESSAGES_FACTORY, 
TX_MESSAGES_FACTORY);
+    }
+
+    private static @Nullable TransactionMeta asTransactionMeta(@Nullable 
TransactionMetaMessage transactionMetaMessage) {
+        return transactionMetaMessage == null ? null : 
transactionMetaMessage.asTransactionMeta();
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 356c7682a4..0a55b88263 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -21,7 +21,6 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
-import static 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageUtils.toTxMetaMessage;
 import static 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotMetaUtils.snapshotMetaAt;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
@@ -77,7 +76,6 @@ import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMeta
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse.ResponseEntry;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataRequest;
-import 
org.apache.ignite.internal.partition.replicator.network.raft.TxMetaMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
@@ -108,6 +106,8 @@ import 
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.TransactionIds;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxMetaMessage;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
@@ -148,6 +148,8 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
 
     private static final LowWatermarkMessagesFactory LWM_MSG_FACTORY = new 
LowWatermarkMessagesFactory();
 
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
+
     private final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
 
     private final ClusterNode clusterNode = mock(ClusterNode.class);
@@ -285,7 +287,7 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
 
             List<TxMetaMessage> txMetas = txIds.stream()
                     .map(outgoingTxStatePartitionStorage::get)
-                    .map(txMeta -> toTxMetaMessage(TABLE_MSG_FACTORY, 
REPLICA_MESSAGES_FACTORY, txMeta))
+                    .map(txMeta -> 
txMeta.toTransactionMetaMessage(REPLICA_MESSAGES_FACTORY, TX_MESSAGES_FACTORY))
                     .collect(toList());
 
             return 
completedFuture(TABLE_MSG_FACTORY.snapshotTxDataResponse().txIds(txIds).txMeta(txMetas).finish(true).build());
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 19202c05fd..8c0947fe1d 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -19,7 +19,9 @@ package 
org.apache.ignite.internal.table.distributed.replication;
 
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_BUILDING;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
@@ -152,7 +154,6 @@ import 
org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
@@ -218,9 +219,11 @@ import 
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
+import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
+import org.apache.ignite.internal.tx.message.TxStateResponse;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
 import org.apache.ignite.internal.tx.test.TestTransactionIds;
@@ -565,12 +568,10 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             if (argument instanceof TxStateCoordinatorRequest) {
                 TxStateCoordinatorRequest req = (TxStateCoordinatorRequest) 
argument;
 
-                var resp = new 
TxMessagesFactory().txStateResponse().txStateMeta(txManager.stateMeta(req.txId())).build();
-
-                return completedFuture(resp);
+                return 
completedFuture(toTxStateResponse(txManager.stateMeta(req.txId())));
             }
 
-            return CompletableFuture.failedFuture(new Exception("Test 
exception"));
+            return failedFuture(new Exception("Test exception"));
         }).when(messagingService).invoke(any(ClusterNode.class), any(), 
anyLong());
 
         doAnswer(invocation -> {
@@ -579,12 +580,10 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             if (argument instanceof TxStateCoordinatorRequest) {
                 TxStateCoordinatorRequest req = (TxStateCoordinatorRequest) 
argument;
 
-                var resp = new 
TxMessagesFactory().txStateResponse().txStateMeta(txManager.stateMeta(req.txId())).build();
-
-                return completedFuture(resp);
+                return 
completedFuture(toTxStateResponse(txManager.stateMeta(req.txId())));
             }
 
-            return CompletableFuture.failedFuture(new Exception("Test 
exception"));
+            return failedFuture(new Exception("Test exception"));
         }).when(messagingService).invoke(anyString(), any(), anyLong());
 
         ClusterNodeResolver clusterNodeResolver = new ClusterNodeResolver() {
@@ -1701,7 +1700,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         TxFinishReplicaRequest commitRequest = 
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
                 .txId(txId)
-                .groups(Map.of(grpId, localNode.name()))
+                .groups(Map.of(tablePartitionIdMessage(grpId), 
localNode.name()))
                 .commit(false)
                 .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                 .build();
@@ -1765,7 +1764,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         TxFinishReplicaRequest commitRequest = 
TX_MESSAGES_FACTORY.txFinishReplicaRequest()
                 .groupId(tablePartitionIdMessage(grpId))
                 .txId(txId)
-                .groups(Map.of(grpId, localNode.name()))
+                .groups(Map.of(tablePartitionIdMessage(grpId), 
localNode.name()))
                 .commit(true)
                 .commitTimestampLong(hybridTimestampToLong(commitTimestamp))
                 .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
@@ -2486,7 +2485,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
     private void testCommitRequestIfTableWasDropped(
             TablePartitionId commitPartitionId,
-            Map<ReplicationGroupId, String> groups,
+            Map<TablePartitionId, String> groups,
             int tableToBeDroppedId
     ) {
         when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), 
any(), any(HybridTimestamp.class)))
@@ -2507,7 +2506,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         CompletableFuture<?> future = partitionReplicaListener.invoke(
                 TX_MESSAGES_FACTORY.txFinishReplicaRequest()
                         .groupId(tablePartitionIdMessage(commitPartitionId))
-                        .groups(groups)
+                        .groups(groups.entrySet().stream().collect(toMap(e -> 
tablePartitionIdMessage(e.getKey()), Map.Entry::getValue)))
                         .txId(txId)
                         
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
                         .commit(true)
@@ -3092,4 +3091,13 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
         when(indexMetaStorage.indexMeta(eq(indexId))).thenReturn(indexMeta);
     }
+
+    private static @Nullable TxStateResponse toTxStateResponse(@Nullable 
TransactionMeta transactionMeta) {
+        TransactionMetaMessage transactionMetaMessage =
+                transactionMeta == null ? null : 
transactionMeta.toTransactionMetaMessage(REPLICA_MESSAGES_FACTORY, 
TX_MESSAGES_FACTORY);
+
+        return TX_MESSAGES_FACTORY.txStateResponse()
+                .txStateMeta(transactionMetaMessage)
+                .build();
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
index 975a818479..6e04440841 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionMeta.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.tx;
 
 import java.io.Serializable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.tx.message.TransactionMetaMessage;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -30,4 +33,7 @@ public interface TransactionMeta extends Serializable {
 
     /** Commit timestamp. */
     @Nullable HybridTimestamp commitTimestamp();
+
+    /** Converts to network message. */
+    TransactionMetaMessage toTransactionMetaMessage(ReplicaMessagesFactory 
replicaMessagesFactory, TxMessagesFactory txMessagesFactory);
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
index 6b91e80d29..a8e11643f3 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
@@ -18,12 +18,19 @@
 package org.apache.ignite.internal.tx;
 
 import static java.util.Collections.unmodifiableCollection;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Objects;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxMetaMessage;
 import org.jetbrains.annotations.Nullable;
 
 /** Transaction meta. */
@@ -38,8 +45,7 @@ public class TxMeta implements TransactionMeta {
     private final Collection<TablePartitionId> enlistedPartitions;
 
     /** Commit timestamp. */
-    @Nullable
-    private final HybridTimestamp commitTimestamp;
+    private final @Nullable HybridTimestamp commitTimestamp;
 
     /**
      * The constructor.
@@ -68,6 +74,21 @@ public class TxMeta implements TransactionMeta {
         return commitTimestamp;
     }
 
+    @Override
+    public TxMetaMessage toTransactionMetaMessage(ReplicaMessagesFactory 
replicaMessagesFactory, TxMessagesFactory txMessagesFactory) {
+        var enlistedPartitionMessages = new 
ArrayList<TablePartitionIdMessage>(enlistedPartitions.size());
+
+        for (TablePartitionId enlistedPartition : enlistedPartitions) {
+            
enlistedPartitionMessages.add(toTablePartitionIdMessage(replicaMessagesFactory, 
enlistedPartition));
+        }
+
+        return txMessagesFactory.txMetaMessage()
+                .txStateInt(txState.ordinal())
+                .commitTimestampLong(hybridTimestampToLong(commitTimestamp))
+                .enlistedPartitions(enlistedPartitionMessages)
+                .build();
+    }
+
     @Override
     public String toString() {
         return S.toString(TxMeta.class, this);
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index f489d4a88c..491378513f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -17,13 +17,18 @@
 
 package org.apache.ignite.internal.tx;
 
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
 import static org.apache.ignite.internal.tx.TxState.ABANDONED;
 import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
 
 import java.util.Objects;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateMetaMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -34,17 +39,16 @@ public class TxStateMeta implements TransactionMeta {
 
     private final TxState txState;
 
-    private final String txCoordinatorId;
+    private final @Nullable String txCoordinatorId;
 
-    /** Identifier of the replication group that manages a transaction state. 
*/
-    private final TablePartitionId commitPartitionId;
+    /** ID of the replication group that manages a transaction state. */
+    private final @Nullable TablePartitionId commitPartitionId;
 
-    private final HybridTimestamp commitTimestamp;
+    private final @Nullable HybridTimestamp commitTimestamp;
 
-    private final Long initialVacuumObservationTimestamp;
+    private final @Nullable Long initialVacuumObservationTimestamp;
 
-    @Nullable
-    private final Long cleanupCompletionTimestamp;
+    private final @Nullable Long cleanupCompletionTimestamp;
 
     /**
      * Constructor.
@@ -154,6 +158,21 @@ public class TxStateMeta implements TransactionMeta {
         return cleanupCompletionTimestamp;
     }
 
+    @Override
+    public TxStateMetaMessage toTransactionMetaMessage(
+            ReplicaMessagesFactory replicaMessagesFactory,
+            TxMessagesFactory txMessagesFactory
+    ) {
+        return txMessagesFactory.txStateMetaMessage()
+                .txStateInt(txState.ordinal())
+                .txCoordinatorId(txCoordinatorId)
+                .commitPartitionId(commitPartitionId == null ? null : 
toTablePartitionIdMessage(replicaMessagesFactory, commitPartitionId))
+                .commitTimestampLong(hybridTimestampToLong(commitTimestamp))
+                
.initialVacuumObservationTimestamp(initialVacuumObservationTimestamp)
+                .cleanupCompletionTimestamp(cleanupCompletionTimestamp)
+                .build();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java
index 674283240b..0a45671e39 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaAbandoned.java
@@ -17,10 +17,15 @@
 
 package org.apache.ignite.internal.tx;
 
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
 import static org.apache.ignite.internal.tx.TxState.ABANDONED;
 
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateMetaAbandonedMessage;
 import org.apache.ignite.internal.util.FastTimestamps;
 
 /**
@@ -36,9 +41,9 @@ public class TxStateMetaAbandoned extends TxStateMeta {
      * Constructor.
      *
      * @param txCoordinatorId Transaction coordinator id.
-     * @param commitPartitionId Commit partition replication group id.
+     * @param commitPartitionId Commit partition replication group ID.
      */
-    TxStateMetaAbandoned(
+    public TxStateMetaAbandoned(
             String txCoordinatorId,
             TablePartitionId commitPartitionId
     ) {
@@ -56,6 +61,24 @@ public class TxStateMetaAbandoned extends TxStateMeta {
         return lastAbandonedMarkerTs;
     }
 
+    @Override
+    public TxStateMetaAbandonedMessage toTransactionMetaMessage(
+            ReplicaMessagesFactory replicaMessagesFactory,
+            TxMessagesFactory txMessagesFactory
+    ) {
+        TablePartitionId tablePartitionId = commitPartitionId();
+
+        return txMessagesFactory.txStateMetaAbandonedMessage()
+                .txStateInt(txState().ordinal())
+                .txCoordinatorId(txCoordinatorId())
+                .commitPartitionId(tablePartitionId == null ? null : 
toTablePartitionIdMessage(replicaMessagesFactory, tablePartitionId))
+                .commitTimestampLong(hybridTimestampToLong(commitTimestamp()))
+                
.initialVacuumObservationTimestamp(initialVacuumObservationTimestamp())
+                .cleanupCompletionTimestamp(cleanupCompletionTimestamp())
+                .lastAbandonedMarkerTs(lastAbandonedMarkerTs)
+                .build();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
index 13659d8070..ae1ae66bad 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMetaFinishing.java
@@ -17,9 +17,15 @@
 
 package org.apache.ignite.internal.tx;
 
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateMetaFinishingMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -56,6 +62,23 @@ public class TxStateMetaFinishing extends TxStateMeta {
         throw new UnsupportedOperationException("Can't get commit timestamp 
from FINISHING transaction state meta.");
     }
 
+    @Override
+    public TxStateMetaFinishingMessage toTransactionMetaMessage(
+            ReplicaMessagesFactory replicaMessagesFactory,
+            TxMessagesFactory txMessagesFactory
+    ) {
+        TablePartitionId commitPartitionId = commitPartitionId();
+
+        return txMessagesFactory.txStateMetaFinishingMessage()
+                .txStateInt(txState().ordinal())
+                .txCoordinatorId(txCoordinatorId())
+                .commitPartitionId(commitPartitionId == null ? null : 
toTablePartitionIdMessage(replicaMessagesFactory, commitPartitionId))
+                .commitTimestampLong(hybridTimestampToLong(commitTimestamp()))
+                
.initialVacuumObservationTimestamp(initialVacuumObservationTimestamp())
+                .cleanupCompletionTimestamp(cleanupCompletionTimestamp())
+                .build();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index 9796727c2c..db4035a7c2 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -18,11 +18,13 @@
 package org.apache.ignite.internal.tx.impl;
 
 import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -33,11 +35,13 @@ import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.network.ChannelType;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
+import org.apache.ignite.internal.tx.message.CleanupReplicatedInfoMessage;
 import org.apache.ignite.internal.tx.message.TxCleanupMessage;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
@@ -50,7 +54,10 @@ import org.jetbrains.annotations.Nullable;
  */
 public class TxCleanupRequestHandler {
     /** Tx messages factory. */
-    private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
+
+    /** Replica messages factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
 
     /** Messaging service. */
     private final MessagingService messagingService;
@@ -112,19 +119,22 @@ public class TxCleanupRequestHandler {
         Map<TablePartitionId, CompletableFuture<?>> writeIntentSwitches = new 
HashMap<>();
 
         // These cleanups will all be local.
-        Collection<ReplicationGroupId> groups = txCleanupMessage.groups();
+        List<TablePartitionIdMessage> groups = txCleanupMessage.groups();
 
         if (groups != null) {
-            trackPartitions(txCleanupMessage.txId(), groups, sender);
-
-            for (ReplicationGroupId group : groups) {
-                writeIntentSwitches.put((TablePartitionId) group,
-                        writeIntentSwitchProcessor.switchLocalWriteIntents(
-                                (TablePartitionId) group,
-                                txCleanupMessage.txId(),
-                                txCleanupMessage.commit(),
-                                txCleanupMessage.commitTimestamp()
-                        ).thenAccept(this::processWriteIntentSwitchResponse));
+            Set<TablePartitionId> groupSet = asTablePartitionIdSet(groups);
+
+            trackPartitions(txCleanupMessage.txId(), groupSet, sender);
+
+            for (TablePartitionId group : groupSet) {
+                CompletableFuture<Void> future = 
writeIntentSwitchProcessor.switchLocalWriteIntents(
+                        group,
+                        txCleanupMessage.txId(),
+                        txCleanupMessage.commit(),
+                        txCleanupMessage.commitTimestamp()
+                ).thenAccept(this::processWriteIntentSwitchResponse);
+
+                writeIntentSwitches.put(group, future);
             }
         }
         // First trigger the cleanup to properly release the locks if we know 
all affected partitions on this node.
@@ -164,22 +174,22 @@ public class TxCleanupRequestHandler {
     }
 
     private NetworkMessage prepareResponse() {
-        return FACTORY
+        return TX_MESSAGES_FACTORY
                 .txCleanupMessageResponse()
                 .timestampLong(clockService.nowLong())
                 .build();
     }
 
     private NetworkMessage prepareResponse(CleanupReplicatedInfo result) {
-        return FACTORY
+        return TX_MESSAGES_FACTORY
                 .txCleanupMessageResponse()
-                .result(result)
+                .result(toCleanupReplicatedInfoMessage(result))
                 .timestampLong(clockService.nowLong())
                 .build();
     }
 
     private NetworkMessage prepareErrorResponse(UUID txId, Throwable th) {
-        return FACTORY
+        return TX_MESSAGES_FACTORY
                 .txCleanupMessageErrorResponse()
                 .txId(txId)
                 .throwable(th)
@@ -194,13 +204,8 @@ public class TxCleanupRequestHandler {
      * @param groups Replication groups.
      * @param sender Cleanup request sender, needed to send cleanup replicated 
response.
      */
-    private void trackPartitions(UUID txId, Collection<ReplicationGroupId> 
groups, ClusterNode sender) {
-        Set<TablePartitionId> partitions =
-                groups.stream()
-                        .map(TablePartitionId.class::cast)
-                        .collect(toSet());
-
-        writeIntentsReplicated.put(txId, new CleanupContext(sender, 
partitions, partitions));
+    private void trackPartitions(UUID txId, Set<TablePartitionId> groups, 
ClusterNode sender) {
+        writeIntentsReplicated.put(txId, new CleanupContext(sender, groups, 
groups));
     }
 
     /**
@@ -266,4 +271,28 @@ public class TxCleanupRequestHandler {
             this.initialPartitions = initialPartitions;
         }
     }
+
+    private static CleanupReplicatedInfoMessage 
toCleanupReplicatedInfoMessage(CleanupReplicatedInfo info) {
+        Collection<TablePartitionId> partitions = info.partitions();
+        List<TablePartitionIdMessage> partitionMessages = new 
ArrayList<>(partitions.size());
+
+        for (TablePartitionId partition : partitions) {
+            
partitionMessages.add(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, 
partition));
+        }
+
+        return TX_MESSAGES_FACTORY.cleanupReplicatedInfoMessage()
+                .txId(info.txId())
+                .partitions(partitionMessages)
+                .build();
+    }
+
+    private static Set<TablePartitionId> 
asTablePartitionIdSet(List<TablePartitionIdMessage> messages) {
+        var set = new HashSet<TablePartitionId>(messages.size());
+
+        for (int i = 0; i < messages.size(); i++) {
+            set.add(messages.get(i).asTablePartitionId());
+        }
+
+        return set;
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index e617d111f1..6334a5b691 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -32,12 +32,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import 
org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
 import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
+import org.apache.ignite.internal.tx.message.CleanupReplicatedInfoMessage;
 import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
 import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
@@ -87,11 +87,11 @@ public class TxCleanupRequestSender {
                 // The cleanup response is sent only in the success case, 
hence no error is expected.
                 assert !(msg instanceof TxCleanupMessageErrorResponse) : 
"Cleanup error response is not expected here.";
 
-                CleanupReplicatedInfo result = ((TxCleanupMessageResponse) 
msg).result();
+                CleanupReplicatedInfoMessage result = 
((TxCleanupMessageResponse) msg).result();
 
                 assert result != null : "Result for the cleanup response 
cannot be null.";
 
-                onCleanupReplicated(result);
+                onCleanupReplicated(result.asCleanupReplicatedInfo());
             }
         });
     }
@@ -237,9 +237,7 @@ public class TxCleanupRequestSender {
             String node,
             @Nullable Collection<TablePartitionId> partitions
     ) {
-        Collection<ReplicationGroupId> enlistedPartitions = 
(Collection<ReplicationGroupId>) (Collection<?>) partitions;
-
-        return txMessageSender.cleanup(node, enlistedPartitions, txId, commit, 
commitTimestamp)
+        return txMessageSender.cleanup(node, partitions, txId, commit, 
commitTimestamp)
                 .handle((networkMessage, throwable) -> {
                     if (throwable != null) {
                         if 
(TransactionFailureHandler.isRecoverable(throwable)) {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index ed68f15543..deda198098 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -82,7 +82,6 @@ import 
org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
 import org.apache.ignite.internal.replicator.exception.ReplicationException;
@@ -590,7 +589,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                         (unused, throwable) -> {
                             boolean verifiedCommit = throwable == null && 
commit;
 
-                            Map<ReplicationGroupId, String> 
replicationGroupIds = enlistedGroups.entrySet().stream()
+                            Map<TablePartitionId, String> replicationGroupIds 
= enlistedGroups.entrySet().stream()
                                     .collect(Collectors.toMap(
                                             Entry::getKey,
                                             entry -> 
entry.getValue().get1().name()
@@ -617,7 +616,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             HybridTimestampTracker observableTimestampTracker,
             TablePartitionId commitPartition,
             boolean commit,
-            Map<ReplicationGroupId, String> replicationGroupIds,
+            Map<TablePartitionId, String> replicationGroupIds,
             UUID txId,
             HybridTimestamp commitTimestamp,
             CompletableFuture<TransactionMeta> txFinishFuture
@@ -690,7 +689,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             String primaryConsistentId,
             Long enlistmentConsistencyToken,
             boolean commit,
-            Map<ReplicationGroupId, String> replicationGroupIds,
+            Map<TablePartitionId, String> replicationGroupIds,
             UUID txId,
             HybridTimestamp commitTimestamp,
             CompletableFuture<TransactionMeta> txFinishFuture
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 558cbd3b0d..355958853b 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.tx.impl;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -29,10 +32,10 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
 import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TransactionResult;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -45,7 +48,7 @@ import org.jetbrains.annotations.Nullable;
  */
 public class TxMessageSender {
     /** Tx messages factory. */
-    private static final TxMessagesFactory TX_MESSAGE_FACTORY = new 
TxMessagesFactory();
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
 
     /** Replica messages factory. */
     private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
@@ -99,7 +102,7 @@ public class TxMessageSender {
     ) {
         return replicaService.invoke(
                 primaryConsistentId,
-                TX_MESSAGE_FACTORY.writeIntentSwitchReplicaRequest()
+                TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
                         
.groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId))
                         .timestampLong(clockService.nowLong())
                         .txId(txId)
@@ -113,7 +116,7 @@ public class TxMessageSender {
      * Sends cleanup request to the specified primary replica.
      *
      * @param primaryConsistentId Primary replica to process given cleanup 
request.
-     * @param replicationGroupIds Table partition ids.
+     * @param replicationGroupIds Table partition IDs.
      * @param txId Transaction id.
      * @param commit {@code True} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -121,19 +124,19 @@ public class TxMessageSender {
      */
     public CompletableFuture<NetworkMessage> cleanup(
             String primaryConsistentId,
-            @Nullable Collection<ReplicationGroupId> replicationGroupIds,
+            @Nullable Collection<TablePartitionId> replicationGroupIds,
             UUID txId,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp
     ) {
         return messagingService.invoke(
                 primaryConsistentId,
-                TX_MESSAGE_FACTORY.txCleanupMessage()
+                TX_MESSAGES_FACTORY.txCleanupMessage()
                         .txId(txId)
                         .commit(commit)
                         
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
                         .timestampLong(clockService.nowLong())
-                        .groups(replicationGroupIds)
+                        
.groups(toTablePartitionIdMessages(replicationGroupIds))
                         .build(),
                 transactionConfiguration.rpcTimeout().value());
     }
@@ -153,7 +156,7 @@ public class TxMessageSender {
     public CompletableFuture<TransactionResult> finish(
             String primaryConsistentId,
             TablePartitionId commitPartition,
-            Map<ReplicationGroupId, String> replicationGroupIds,
+            Map<TablePartitionId, String> replicationGroupIds,
             UUID txId,
             Long consistencyToken,
             boolean commit,
@@ -161,11 +164,11 @@ public class TxMessageSender {
     ) {
         return replicaService.invoke(
                 primaryConsistentId,
-                TX_MESSAGE_FACTORY.txFinishReplicaRequest()
+                TX_MESSAGES_FACTORY.txFinishReplicaRequest()
                         .txId(txId)
                         .timestampLong(clockService.nowLong())
                         
.groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartition))
-                        .groups(replicationGroupIds)
+                        
.groups(toTablePartitionIdMessages(replicationGroupIds))
                         .commit(commit)
                         
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
                         .enlistmentConsistencyToken(consistencyToken)
@@ -189,7 +192,7 @@ public class TxMessageSender {
     ) {
         return replicaService.invoke(
                 primaryConsistentId,
-                TX_MESSAGE_FACTORY.txStateCommitPartitionRequest()
+                TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
                         
.groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitGrpId))
                         .txId(txId)
                         .enlistmentConsistencyToken(consistencyToken)
@@ -211,7 +214,7 @@ public class TxMessageSender {
     ) {
         return messagingService.invoke(
                         primaryConsistentId,
-                        TX_MESSAGE_FACTORY.txStateCoordinatorRequest()
+                        TX_MESSAGES_FACTORY.txStateCoordinatorRequest()
                                 .readTimestampLong(timestamp.longValue())
                                 .txId(txId)
                                 .build(),
@@ -233,7 +236,7 @@ public class TxMessageSender {
     public CompletableFuture<ReplicaResponse> sendRecoveryCleanup(String 
primaryConsistentId, TablePartitionId tablePartitionId) {
         return replicaService.invoke(
                 primaryConsistentId,
-                TX_MESSAGE_FACTORY.txCleanupRecoveryRequest()
+                TX_MESSAGES_FACTORY.txCleanupRecoveryRequest()
                         
.groupId(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId))
                         .build()
         );
@@ -242,4 +245,32 @@ public class TxMessageSender {
     public MessagingService messagingService() {
         return messagingService;
     }
+
+    private static @Nullable List<TablePartitionIdMessage> 
toTablePartitionIdMessages(
+            @Nullable Collection<TablePartitionId> tablePartitionIds
+    ) {
+        if (tablePartitionIds == null) {
+            return null;
+        }
+
+        var messages = new 
ArrayList<TablePartitionIdMessage>(tablePartitionIds.size());
+
+        for (TablePartitionId tablePartitionId : tablePartitionIds) {
+            messages.add(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, 
tablePartitionId));
+        }
+
+        return messages;
+    }
+
+    private static Map<TablePartitionIdMessage, String> 
toTablePartitionIdMessages(
+            Map<TablePartitionId, String> replicationGroupIds
+    ) {
+        var messages = new HashMap<TablePartitionIdMessage, 
String>(replicationGroupIds.size());
+
+        for (Map.Entry<TablePartitionId, String> e : 
replicationGroupIds.entrySet()) {
+            messages.put(toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, 
e.getKey()), e.getValue());
+        }
+
+        return messages;
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfoMessage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfoMessage.java
new file mode 100644
index 0000000000..d3984f2a8c
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/CleanupReplicatedInfoMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+
+/** Message for transferring a {@link CleanupReplicatedInfo}. */
+@Transferable(TxMessageGroup.CLEANUP_REPLICATED_INFO_MESSAGE)
+public interface CleanupReplicatedInfoMessage extends NetworkMessage {
+    /** Transaction ID. */
+    UUID txId();
+
+    /** Partitions. */
+    List<TablePartitionIdMessage> partitions();
+
+    /** Converts to {@link CleanupReplicatedInfo}. */
+    default CleanupReplicatedInfo asCleanupReplicatedInfo() {
+        List<TablePartitionIdMessage> partitionMessages = partitions();
+        List<TablePartitionId> partitions = new 
ArrayList<>(partitionMessages.size());
+
+        for (int i = 0; i < partitionMessages.size(); i++) {
+            partitions.add(partitionMessages.get(i).asTablePartitionId());
+        }
+
+        return new CleanupReplicatedInfo(txId(), partitions);
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/TxMetaMessage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TransactionMetaMessage.java
similarity index 57%
copy from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/TxMetaMessage.java
copy to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TransactionMetaMessage.java
index 4dedc35220..934f595149 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/TxMetaMessage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TransactionMetaMessage.java
@@ -15,34 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.partition.replicator.network.raft;
+package org.apache.ignite.internal.tx.message;
 
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
 
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
-import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
-import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxState;
 import org.jetbrains.annotations.Nullable;
 
-/** Message for transferring a {@link TxMeta}. */
-@Transferable(PartitionReplicationMessageGroup.TX_META_MESSAGE)
-public interface TxMetaMessage extends NetworkMessage {
+/** Message for transferring a {@link TransactionMeta}. */
+public interface TransactionMetaMessage extends NetworkMessage {
     /** Ordinal of {@link TxState} value. */
     int txStateInt();
 
     /** Commit timestamp in primitive representation, {@link 
HybridTimestamp#NULL_HYBRID_TIMESTAMP} as {@code null}. */
     long commitTimestampLong();
 
-    /** List of enlisted partition groups. */
-    List<TablePartitionIdMessage> enlistedPartitions();
-
     /** Transaction state. */
     default TxState txState() {
         TxState state = TxState.fromOrdinal(txStateInt());
@@ -57,15 +47,8 @@ public interface TxMetaMessage extends NetworkMessage {
         return nullableHybridTimestamp(commitTimestampLong());
     }
 
-    /** Converts to {@link TxMeta}. */
-    default TxMeta asTxMeta() {
-        List<TablePartitionIdMessage> enlistedPartitionMessages = 
enlistedPartitions();
-        var enlistedPartitions = new 
ArrayList<TablePartitionId>(enlistedPartitionMessages.size());
-
-        for (int i = 0; i < enlistedPartitionMessages.size(); i++) {
-            
enlistedPartitions.add(enlistedPartitionMessages.get(i).asTablePartitionId());
-        }
-
-        return new TxMeta(txState(), enlistedPartitions, commitTimestamp());
+    /** Converts to {@link TransactionMeta}. */
+    default TransactionMeta asTransactionMeta() {
+        throw new AssertionError("Must be implemented by heirs.");
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java
index 605fa2066d..138d9be809 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessage.java
@@ -19,12 +19,11 @@ package org.apache.ignite.internal.tx.message;
 
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
 
-import java.util.Collection;
+import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.jetbrains.annotations.Nullable;
 
@@ -46,9 +45,7 @@ public interface TxCleanupMessage extends TimestampAware {
      *
      * @return Replication groups aggregated by expected primary replica nodes.
      */
-    @Marshallable
-    @Nullable
-    Collection<ReplicationGroupId> groups();
+    @Nullable List<TablePartitionIdMessage> groups();
 
     /**
      * Returns {@code True} if a commit request.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
index 39cbd7aae9..ef872a50b5 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.tx.message;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.jetbrains.annotations.Nullable;
@@ -27,8 +26,6 @@ import org.jetbrains.annotations.Nullable;
  */
 @Transferable(TxMessageGroup.TX_CLEANUP_MSG_RESPONSE)
 public interface TxCleanupMessageResponse extends TimestampAware {
-
-    @Nullable
-    @Marshallable
-    CleanupReplicatedInfo result();
+    /** Result of a replicated cleanup request. */
+    @Nullable CleanupReplicatedInfoMessage result();
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
index a663f1e8a5..abecb5159f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
@@ -22,10 +22,9 @@ import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimes
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.jetbrains.annotations.Nullable;
 
@@ -66,11 +65,6 @@ public interface TxFinishReplicaRequest extends 
PrimaryReplicaRequest, Timestamp
         return nullableHybridTimestamp(commitTimestampLong());
     }
 
-    /**
-     * Returns enlisted partition groups aggregated by expected primary 
replica nodes.
-     *
-     * @return Enlisted partition groups aggregated by expected primary 
replica nodes.
-     */
-    @Marshallable
-    Map<ReplicationGroupId, String> groups();
+    /** Enlisted partition groups aggregated by expected primary replica 
nodes. */
+    Map<TablePartitionIdMessage, String> groups();
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index 68fd6ccfdc..338702389a 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -94,4 +94,18 @@ public class TxMessageGroup {
      */
     public static final short VACUUM_TX_STATE_COMMAND = 13;
 
+    /** Message type for {@link TxMetaMessage}. */
+    public static final short TX_META_MESSAGE = 14;
+
+    /** Message type for {@link TxStateMetaMessage}. */
+    public static final short TX_STATE_META_MESSAGE = 15;
+
+    /** Message type for {@link TxStateMetaAbandonedMessage}. */
+    public static final short TX_STATE_META_ABANDONED_MESSAGE = 16;
+
+    /** Message type for {@link TxStateMetaFinishingMessage}. */
+    public static final short TX_STATE_META_FINISHING_MESSAGE = 17;
+
+    /** Message type for {@link CleanupReplicatedInfoMessage}. */
+    public static final short CLEANUP_REPLICATED_INFO_MESSAGE = 18;
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/TxMetaMessage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMetaMessage.java
similarity index 60%
rename from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/TxMetaMessage.java
rename to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMetaMessage.java
index 4dedc35220..fc9f3a333b 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/raft/TxMetaMessage.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMetaMessage.java
@@ -15,48 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.partition.replicator.network.raft;
-
-import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
+package org.apache.ignite.internal.tx.message;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxMeta;
-import org.apache.ignite.internal.tx.TxState;
-import org.jetbrains.annotations.Nullable;
 
 /** Message for transferring a {@link TxMeta}. */
-@Transferable(PartitionReplicationMessageGroup.TX_META_MESSAGE)
-public interface TxMetaMessage extends NetworkMessage {
-    /** Ordinal of {@link TxState} value. */
-    int txStateInt();
-
-    /** Commit timestamp in primitive representation, {@link 
HybridTimestamp#NULL_HYBRID_TIMESTAMP} as {@code null}. */
-    long commitTimestampLong();
-
+@Transferable(TxMessageGroup.TX_META_MESSAGE)
+public interface TxMetaMessage extends TransactionMetaMessage {
     /** List of enlisted partition groups. */
     List<TablePartitionIdMessage> enlistedPartitions();
 
-    /** Transaction state. */
-    default TxState txState() {
-        TxState state = TxState.fromOrdinal(txStateInt());
-
-        assert state != null : txStateInt();
-
-        return state;
-    }
-
-    /** Commit timestamp. */
-    default @Nullable HybridTimestamp commitTimestamp() {
-        return nullableHybridTimestamp(commitTimestampLong());
-    }
-
     /** Converts to {@link TxMeta}. */
     default TxMeta asTxMeta() {
         List<TablePartitionIdMessage> enlistedPartitionMessages = 
enlistedPartitions();
@@ -68,4 +42,9 @@ public interface TxMetaMessage extends NetworkMessage {
 
         return new TxMeta(txState(), enlistedPartitions, commitTimestamp());
     }
+
+    @Override
+    default TransactionMeta asTransactionMeta() {
+        return asTxMeta();
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaAbandonedMessage.java
similarity index 51%
copy from 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
copy to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaAbandonedMessage.java
index 5d5ce4bb2a..432ffbd98e 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaAbandonedMessage.java
@@ -17,18 +17,29 @@
 
 package org.apache.ignite.internal.tx.message;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
 import org.apache.ignite.internal.tx.TransactionMeta;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.tx.TxStateMetaAbandoned;
 
-/**
- * Transaction state response.
- */
-@Transferable(TxMessageGroup.TX_STATE_RESPONSE)
-public interface TxStateResponse extends TimestampAware {
-    @Marshallable
-    @Nullable
-    TransactionMeta txStateMeta();
+/** Message for transferring a {@link TxStateMetaAbandoned}. */
+@Transferable(TxMessageGroup.TX_STATE_META_ABANDONED_MESSAGE)
+public interface TxStateMetaAbandonedMessage extends TxStateMetaMessage {
+    /** Timestamp when the latest {@code ABANDONED} state set. */
+    long lastAbandonedMarkerTs();
+
+    /** Converts to {@link TxStateMetaAbandoned}. */
+    default TxStateMetaAbandoned asTxStateMetaAbandoned() {
+        TablePartitionIdMessage commitPartitionId = commitPartitionId();
+
+        return new TxStateMetaAbandoned(
+                txCoordinatorId(),
+                commitPartitionId == null ? null : 
commitPartitionId.asTablePartitionId()
+        );
+    }
+
+    @Override
+    default TransactionMeta asTransactionMeta() {
+        return asTxStateMetaAbandoned();
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaFinishingMessage.java
similarity index 54%
copy from 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
copy to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaFinishingMessage.java
index 5d5ce4bb2a..16ba5c6ea9 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaFinishingMessage.java
@@ -17,18 +17,26 @@
 
 package org.apache.ignite.internal.tx.message;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
 import org.apache.ignite.internal.tx.TransactionMeta;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.tx.TxStateMetaFinishing;
 
-/**
- * Transaction state response.
- */
-@Transferable(TxMessageGroup.TX_STATE_RESPONSE)
-public interface TxStateResponse extends TimestampAware {
-    @Marshallable
-    @Nullable
-    TransactionMeta txStateMeta();
+/** Message for transferring a {@link TxStateMetaFinishing}. */
+@Transferable(TxMessageGroup.TX_STATE_META_FINISHING_MESSAGE)
+public interface TxStateMetaFinishingMessage extends TxStateMetaMessage {
+    /** Converts to {@link TxStateMetaFinishing}. */
+    default TxStateMetaFinishing asTxStateMetaFinishing() {
+        TablePartitionIdMessage commitPartitionId = commitPartitionId();
+
+        return new TxStateMetaFinishing(
+                txCoordinatorId(),
+                commitPartitionId == null ? null : 
commitPartitionId.asTablePartitionId()
+        );
+    }
+
+    @Override
+    default TransactionMeta asTransactionMeta() {
+        return asTxStateMetaFinishing();
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java
new file mode 100644
index 0000000000..48285fb635
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateMetaMessage.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.jetbrains.annotations.Nullable;
+
+/** Message for transferring a {@link TxStateMeta}. */
+@Transferable(TxMessageGroup.TX_STATE_META_MESSAGE)
+public interface TxStateMetaMessage extends TransactionMetaMessage {
+    /** Transaction coordinator ID. */
+    @Nullable String txCoordinatorId();
+
+    /** ID of the replication group that manages a transaction state. */
+    @Nullable TablePartitionIdMessage commitPartitionId();
+
+    /** Initial vacuum observation timestamp. */
+    @Nullable Long initialVacuumObservationTimestamp();
+
+    /** Cleanup completion timestamp. */
+    @Nullable Long cleanupCompletionTimestamp();
+
+    /** Converts to {@link TxStateMeta}. */
+    default TxStateMeta asTxStateMeta() {
+        TablePartitionIdMessage commitPartitionId = commitPartitionId();
+
+        return new TxStateMeta(
+                txState(),
+                txCoordinatorId(),
+                commitPartitionId == null ? null : 
commitPartitionId.asTablePartitionId(),
+                commitTimestamp(),
+                initialVacuumObservationTimestamp(),
+                cleanupCompletionTimestamp()
+        );
+    }
+
+    @Override
+    default TransactionMeta asTransactionMeta() {
+        return asTxStateMeta();
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
index 5d5ce4bb2a..20c231e0de 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateResponse.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.tx.message;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
-import org.apache.ignite.internal.tx.TransactionMeta;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -28,7 +26,6 @@ import org.jetbrains.annotations.Nullable;
  */
 @Transferable(TxMessageGroup.TX_STATE_RESPONSE)
 public interface TxStateResponse extends TimestampAware {
-    @Marshallable
-    @Nullable
-    TransactionMeta txStateMeta();
+    /** Transaction metadata. */
+    @Nullable TransactionMetaMessage txStateMeta();
 }

Reply via email to