This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 51d25d217a IGNITE-17886 Table commands migrated to use 
NetworkMessage-based code generation infrastructure
51d25d217a is described below

commit 51d25d217af468d9611ee2536d929a017506b9a6
Author: Sergey Chugunov <sergey.chugu...@gmail.com>
AuthorDate: Fri Nov 18 17:19:21 2022 +0300

    IGNITE-17886 Table commands migrated to use NetworkMessage-based code 
generation infrastructure
    
    Signed-off-by: Ivan Bessonov <bessonov...@gmail.com>
---
 .../ignite/distributed/ItTablePersistenceTest.java |   3 +-
 .../distributed/ItTxDistributedTestSingleNode.java |   3 +-
 .../ignite/internal/table/ItColocationTest.java    |  49 ++++--
 .../internal/table/distributed/TableManager.java   |   6 +-
 .../table/distributed/TableMessageGroup.java       |  29 ++++
 .../table/distributed/command/FinishTxCommand.java |  56 +------
 ...ionCommand.java => HybridTimestampMessage.java} |  34 ++--
 .../distributed/command/PartitionCommand.java      |  24 +--
 ...onCommand.java => TablePartitionIdMessage.java} |  33 ++--
 .../distributed/command/TxCleanupCommand.java      |  42 +----
 .../distributed/command/UpdateAllCommand.java      | 106 ++-----------
 .../table/distributed/command/UpdateCommand.java   |  92 ++---------
 .../table/distributed/raft/PartitionListener.java  |  44 ++++--
 .../replicator/PartitionReplicaListener.java       | 176 +++++++++++++++++----
 .../PartitionRaftCommandsSerializationTest.java    | 121 ++++++++++----
 .../raft/PartitionCommandListenerTest.java         | 156 +++++++++++++++---
 .../table/impl/DummyInternalTableImpl.java         |   3 +-
 17 files changed, 522 insertions(+), 455 deletions(-)

diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index d74a09714d..6a4601d466 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -274,7 +274,8 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                             new TestPartitionDataStorage(testMpPartStorage),
                             new TestTxStateStorage(),
                             txManager,
-                            Map::of
+                            Map::of,
+                            0
                     );
 
                     paths.put(listener, workDir);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 50c6109df6..efe23949f4 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -432,7 +432,8 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                                     new 
TestPartitionDataStorage(testMpPartStorage),
                                     new TestTxStateStorage(),
                                     txManagers.get(node),
-                                    () -> Map.of(pkStorage.get().id(), 
pkStorage.get())
+                                    () -> Map.of(pkStorage.get().id(), 
pkStorage.get()),
+                                    partId
                             );
                         },
                         RaftGroupOptions.defaults()
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 23312776c4..5cf66004ef 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -56,8 +56,8 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.NativeTypeSpec;
@@ -67,8 +67,8 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
 import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import 
org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
@@ -78,6 +78,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.tx.Timestamp;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -90,6 +91,7 @@ import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.ByteString;
 import org.apache.ignite.table.Tuple;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -113,6 +115,9 @@ public class ItColocationTest {
     /** Map of the Raft commands are set by table operation. */
     private static final Int2ObjectMap<Set<Command>> CMDS_MAP = new 
Int2ObjectOpenHashMap<>();
 
+    /** Message factory to create messages - RAFT commands.  */
+    private static final TableMessagesFactory MSG_FACTORY = new 
TableMessagesFactory();
+
     private SchemaDescriptor schema;
 
     private SchemaRegistry schemaRegistry;
@@ -184,20 +189,34 @@ public class ItColocationTest {
             RaftGroupService r = groupRafts.get(request.groupId());
 
             if (request instanceof ReadWriteMultiRowReplicaRequest) {
-                Map<RowId, BinaryRow> rows = 
((ReadWriteMultiRowReplicaRequest) request).binaryRows()
+                Map<UUID, ByteString> rows = 
((ReadWriteMultiRowReplicaRequest) request).binaryRows()
                         .stream()
-                        .collect(toMap(row -> new RowId(0), row -> row));
-
-                return r.run(new UpdateAllCommand(commitPartId, rows, 
UUID.randomUUID()));
+                        .collect(toMap(
+                                row -> Timestamp.nextVersion().toUuid(), row 
-> new ByteString(row.byteBuffer())));
+
+                return r.run(MSG_FACTORY.updateAllCommand()
+                                
.tablePartitionId(MSG_FACTORY.tablePartitionIdMessage()
+                                        .tableId(commitPartId.getTableId())
+                                        .partitionId(commitPartId.getPartId())
+                                        .build()
+                                )
+                            .rowsToUpdate(rows)
+                            .txId(UUID.randomUUID())
+                            .build());
             } else {
                 assertThat(request, 
is(instanceOf(ReadWriteSingleRowReplicaRequest.class)));
 
-                return r.run(new UpdateCommand(
-                        commitPartId,
-                        new RowId(0),
-                        ((ReadWriteSingleRowReplicaRequest) 
request).binaryRow(),
-                        UUID.randomUUID())
-                );
+                return r.run(MSG_FACTORY.updateCommand()
+                        .tablePartitionId(
+                                MSG_FACTORY.tablePartitionIdMessage()
+                                        .tableId(commitPartId.getTableId())
+                                        .partitionId(commitPartId.getPartId())
+                                        .build()
+                        )
+                        .rowUuid(Timestamp.nextVersion().toUuid())
+                        .rowBuffer(new 
ByteString(((ReadWriteSingleRowReplicaRequest) 
request).binaryRow().byteBuffer()))
+                        .txId(UUID.randomUUID())
+                        .build());
             }
         });
 
@@ -366,10 +385,10 @@ public class ItColocationTest {
 
         CMDS_MAP.forEach((p, set) -> {
             UpdateAllCommand cmd = (UpdateAllCommand) 
CollectionUtils.first(set);
-            assertEquals(partsMap.get(p), cmd.getRowsToUpdate().size(), () -> 
"part=" + p + ", set=" + set);
+            assertEquals(partsMap.get(p), cmd.rowsToUpdate().size(), () -> 
"part=" + p + ", set=" + set);
 
-            cmd.getRowsToUpdate().values().forEach(binRow -> {
-                Row r = new Row(schema, binRow);
+            cmd.rowsToUpdate().values().forEach(byteStr -> {
+                Row r = new Row(schema, new 
ByteBufferRow(byteStr.toByteArray()));
 
                 assertEquals(INT_TABLE.partition(r), p);
             });
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ff0fd9a51d..e6f7d1663a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -776,7 +776,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                                                     
partitionDataStorage(mvPartitionStorage, internalTbl, partId),
                                                                     
txStatePartitionStorage,
                                                                     txManager,
-                                                                    
table.indexStorageAdapters(partId)
+                                                                    
table.indexStorageAdapters(partId),
+                                                                    partId
                                                             ),
                                                             new 
RebalanceRaftGroupEventsListener(
                                                                     
metaStorageMgr,
@@ -1824,7 +1825,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                     partitionDataStorage(mvPartitionStorage, 
internalTable, partId),
                                     txStatePartitionStorage,
                                     txManager,
-                                    tbl.indexStorageAdapters(partId)
+                                    tbl.indexStorageAdapters(partId),
+                                    partId
                             );
 
                             RaftGroupEventsListener raftGrpEvtsLsnr = new 
RebalanceRaftGroupEventsListener(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 3eb11ed25f..2a4f6f96c8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -17,6 +17,12 @@
 
 package org.apache.ignite.internal.table.distributed;
 
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import 
org.apache.ignite.internal.table.distributed.command.HybridTimestampMessage;
+import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
 import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
@@ -125,4 +131,27 @@ public interface TableMessageGroup {
      * Message type for {@link SnapshotTxDataResponse}.
      */
     short SNAPSHOT_TX_DATA_RESPONSE = 16;
+
+    /**
+     * Message types for Table module RAFT commands.
+     */
+    public interface Commands {
+        /** Message type for {@link FinishTxCommand}. */
+        short FINISH_TX = 40;
+
+        /** Message type for {@link TxCleanupCommand}. */
+        short TX_CLEANUP = 41;
+
+        /** Message type for {@link UpdateAllCommand}. */
+        short UPDATE_ALL = 42;
+
+        /** Message type for {@link UpdateCommand}. */
+        short UPDATE = 43;
+
+        /** Message type for {@link HybridTimestampMessage}. */
+        short HYBRID_TIMESTAMP = 60;
+
+        /** Message type for {@link TablePartitionIdMessage}. */
+        short TABLE_PARTITION_ID = 61;
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
index f640f2c33f..77a7f789e2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
@@ -18,68 +18,26 @@
 package org.apache.ignite.internal.table.distributed.command;
 
 import java.util.List;
-import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * State machine command to finish a transaction on a commit or a rollback.
  */
-public class FinishTxCommand extends PartitionCommand {
-    /**
-     * A commit or a rollback state.
-     */
-    private final boolean commit;
-
-    /**
-     * Transaction commit timestamp.
-     */
-    private final HybridTimestamp commitTimestamp;
-
-    /**
-     * Replication groups ids.
-     */
-    private final List<ReplicationGroupId> replicationGroupIds;
-
-    /**
-     * The constructor.
-     *
-     * @param txId The txId.
-     * @param commit Commit or rollback state {@code True} to commit.
-     * @param commitTimestamp Transaction commit timestamp.
-     * @param replicationGroupIds Set of replication groups ids.
-     */
-    public FinishTxCommand(UUID txId, boolean commit, HybridTimestamp 
commitTimestamp, List<ReplicationGroupId> replicationGroupIds) {
-        super(txId);
-        this.commit = commit;
-        this.commitTimestamp = commitTimestamp;
-        this.replicationGroupIds = replicationGroupIds;
-    }
-
+@Transferable(TableMessageGroup.Commands.FINISH_TX)
+public interface FinishTxCommand extends PartitionCommand {
     /**
      * Returns a commit or a rollback state.
-     *
-     * @return A commit or a rollback state.
      */
-    public boolean commit() {
-        return commit;
-    }
+    boolean commit();
 
     /**
      * Returns a transaction commit timestamp.
-     *
-     * @return A transaction commit timestamp.
      */
-    public HybridTimestamp commitTimestamp() {
-        return commitTimestamp;
-    }
+    HybridTimestampMessage commitTimestamp();
 
     /**
      * Returns an ordered replication groups ids.
-     *
-     * @return An ordered replication groups ids.
      */
-    public List<ReplicationGroupId> replicationGroupIds() {
-        return replicationGroupIds;
-    }
+    List<TablePartitionIdMessage> tablePartitionIds();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/HybridTimestampMessage.java
similarity index 58%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/HybridTimestampMessage.java
index a039fe3d0c..b69545d1c5 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/HybridTimestampMessage.java
@@ -17,32 +17,22 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.util.UUID;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Partition transactional command.
+ * Interface to represent {@link HybridTimestamp} as a {@link NetworkMessage}.
  */
-public abstract class PartitionCommand implements WriteCommand {
-    /** Transaction id. */
-    private UUID txId;
+@Transferable(TableMessageGroup.Commands.HYBRID_TIMESTAMP)
+public interface HybridTimestampMessage extends NetworkMessage, Serializable {
+    long physical();
 
-    /**
-     * The constructor.
-     *
-     * @param txId Transaction id.
-     */
-    public PartitionCommand(@NotNull UUID txId) {
-        this.txId = txId;
-    }
+    int logical();
 
-    /**
-     * Gets a transaction id.
-     *
-     * @return Transaction id.
-     */
-    public UUID txId() {
-        return txId;
+    default HybridTimestamp asHybridTimestamp() {
+        return new HybridTimestamp(physical(), logical());
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
index a039fe3d0c..555f3f4f6b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
@@ -18,31 +18,15 @@
 package org.apache.ignite.internal.table.distributed.command;
 
 import java.util.UUID;
+import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * Partition transactional command.
  */
-public abstract class PartitionCommand implements WriteCommand {
-    /** Transaction id. */
-    private UUID txId;
-
-    /**
-     * The constructor.
-     *
-     * @param txId Transaction id.
-     */
-    public PartitionCommand(@NotNull UUID txId) {
-        this.txId = txId;
-    }
-
+public interface PartitionCommand extends WriteCommand, NetworkMessage {
     /**
-     * Gets a transaction id.
-     *
-     * @return Transaction id.
+     * Returns a transaction id.
      */
-    public UUID txId() {
-        return txId;
-    }
+    UUID txId();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TablePartitionIdMessage.java
similarity index 57%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TablePartitionIdMessage.java
index a039fe3d0c..9514efec93 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TablePartitionIdMessage.java
@@ -17,32 +17,23 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
+import java.io.Serializable;
 import java.util.UUID;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup.Commands;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Partition transactional command.
+ * Network Message interface to transfer {@link TablePartitionId} as part of 
{@link NetworkMessage}.
  */
-public abstract class PartitionCommand implements WriteCommand {
-    /** Transaction id. */
-    private UUID txId;
+@Transferable(Commands.TABLE_PARTITION_ID)
+public interface TablePartitionIdMessage extends NetworkMessage, Serializable {
+    UUID tableId();
 
-    /**
-     * The constructor.
-     *
-     * @param txId Transaction id.
-     */
-    public PartitionCommand(@NotNull UUID txId) {
-        this.txId = txId;
-    }
+    int partitionId();
 
-    /**
-     * Gets a transaction id.
-     *
-     * @return Transaction id.
-     */
-    public UUID txId() {
-        return txId;
+    default TablePartitionId asTablePartitionId() {
+        return new TablePartitionId(tableId(), partitionId());
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
index 9a8af04bd5..f336faaea2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java
@@ -17,51 +17,21 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
  * State machine command to cleanup on a transaction commit.
  */
-public class TxCleanupCommand extends PartitionCommand {
-    /**
-     * A commit or a rollback state.
-     */
-    private final boolean commit;
-
-    /**
-     * Transaction commit timestamp.
-     */
-    private final HybridTimestamp commitTimestamp;
-
-    /**
-     * The constructor.
-     *
-     * @param txId The txId.
-     * @param commit Commit or rollback state {@code True} to commit.
-     * @param commitTimestamp Transaction commit timestamp.
-     */
-    public TxCleanupCommand(UUID txId, boolean commit, HybridTimestamp 
commitTimestamp) {
-        super(txId);
-        this.commit = commit;
-        this.commitTimestamp = commitTimestamp;
-    }
-
+@Transferable(TableMessageGroup.Commands.TX_CLEANUP)
+public interface TxCleanupCommand extends PartitionCommand {
     /**
      * Returns a commit or a rollback state.
-     *
-     * @return A commit or a rollback state.
      */
-    public boolean commit() {
-        return commit;
-    }
+    boolean commit();
 
     /**
      * Returns a transaction commit timestamp.
-     *
-     * @return A transaction commit timestamp.
      */
-    public HybridTimestamp commitTimestamp() {
-        return commitTimestamp;
-    }
+    HybridTimestampMessage commitTimestamp();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
index 229cb99705..46e112f05c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
@@ -17,108 +17,20 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.storage.RowId;
-import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
-import org.apache.ignite.internal.util.CollectionUtils;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.jraft.util.ByteString;
 
 /**
  * State machine command for updating a batch of entries.
  */
-public class UpdateAllCommand extends PartitionCommand {
-    /** Committed table partition id. */
-    private final TablePartitionId commitReplicationGroupId;
+@Transferable(TableMessageGroup.Commands.UPDATE_ALL)
+public interface UpdateAllCommand extends PartitionCommand {
+    TablePartitionIdMessage tablePartitionId();
 
-    /** Rows to update. */
-    private transient HashMap<RowId, BinaryRow> rowsToUpdate;
-
-    /** Bytes representation of a row to update map. */
-    private byte[] rowsToUpdateBytes;
-
-    /**
-     * Constructor for batch remove.
-     *
-     * @param commitReplicationGroupId Committed table partition id.
-     * @param removeRows Ids to remove.
-     * @param txId Transaction id.
-     */
-    public UpdateAllCommand(@NotNull TablePartitionId 
commitReplicationGroupId, Collection<RowId> removeRows, @NotNull UUID txId) {
-        this(commitReplicationGroupId, removeRows, null, txId);
-    }
-
-    /**
-     * Constructor for a batch update.
-     *
-     * @param commitReplicationGroupId Committed table partition id.
-     * @param rowsToUpdate Rows to update or insert.
-     * @param txId Transaction id.
-     */
-    public UpdateAllCommand(@NotNull TablePartitionId 
commitReplicationGroupId, Map<RowId, BinaryRow> rowsToUpdate, @NotNull UUID 
txId) {
-        this(commitReplicationGroupId, null, rowsToUpdate, txId);
-    }
-
-    /**
-     * The constructor.
-     *
-     * @param commitReplicationGroupId Committed table partition id.
-     * @param removeRows Ids to remove.
-     * @param rowsToUpdate Rows to update or insert.
-     * @param txId Transaction id.
-     */
-    private UpdateAllCommand(
-            @NotNull TablePartitionId commitReplicationGroupId,
-            Collection<RowId> removeRows,
-            Map<RowId, BinaryRow> rowsToUpdate,
-            @NotNull UUID txId
-    ) {
-        super(txId);
-
-        this.commitReplicationGroupId = commitReplicationGroupId;
-
-        int size = (removeRows == null ? 0 : removeRows.size()) + 
(rowsToUpdate == null ? 0 : rowsToUpdate.size());
-
-        HashMap<RowId, BinaryRow> rows = new HashMap<>(size);
-
-        if (!CollectionUtils.nullOrEmpty(removeRows)) {
-            removeRows.forEach(rowId -> rows.put(rowId, null));
-        }
-
-        if (!CollectionUtils.nullOrEmpty(rowsToUpdate)) {
-            rows.putAll(rowsToUpdate);
-        }
-
-        this.rowsToUpdate = rows;
-
-        rowsToUpdateBytes = CommandUtils.rowMapToBytes(rows);
-    }
-
-    /**
-     * Gets a table partition id that the commit partition.
-     *
-     * @return Table partition id.
-     */
-    public TablePartitionId getReplicationGroupId() {
-        return commitReplicationGroupId;
-    }
-
-
-    /**
-     * Gets rows to update.
-     *
-     * @return Rows to update map.
-     */
-    public HashMap<RowId, BinaryRow> getRowsToUpdate() {
-        if (rowsToUpdate == null) {
-            rowsToUpdate = new HashMap<>();
-
-            CommandUtils.readRowMap(rowsToUpdateBytes, rowsToUpdate::put);
-        }
-
-        return rowsToUpdate;
-    }
+    @Marshallable
+    Map<UUID, ByteString> rowsToUpdate();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
index 719db8e50e..468a5c8f4d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
@@ -18,92 +18,20 @@
 package org.apache.ignite.internal.table.distributed.command;
 
 import java.util.UUID;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.storage.RowId;
-import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.jraft.util.ByteString;
 
 /**
  * State machine command to update a row specified by a row id.
  */
-public class UpdateCommand extends PartitionCommand {
-    /** Committed table partition id. */
-    private final TablePartitionId commitReplicationGroupId;
+@Transferable(TableMessageGroup.Commands.UPDATE)
+public interface UpdateCommand extends PartitionCommand {
+    TablePartitionIdMessage tablePartitionId();
 
-    /** Id of a row that will be updated. */
-    private final RowId rowId;
+    UUID rowUuid();
 
-    /** Binary row. */
-    private transient BinaryRow row;
-
-    /** Row bytes. */
-    private byte[] rowBytes;
-
-    /**
-     * Creates a new instance of UpdateCommand with the given row to be 
updated. The {@code rowId} should not be {@code null}.
-     *
-     * @param commitReplicationGroupId Committed table partition id.
-     * @param rowId Row id.
-     * @param row   Binary row.
-     * @param txId  The transaction id.
-     * @see PartitionCommand
-     */
-    public UpdateCommand(
-            @NotNull TablePartitionId commitReplicationGroupId,
-            @NotNull RowId rowId,
-            @Nullable BinaryRow row,
-            @NotNull UUID txId
-    ) {
-        super(txId);
-
-        this.commitReplicationGroupId = commitReplicationGroupId;
-        this.rowId = rowId;
-        this.row = row;
-
-        this.rowBytes = CommandUtils.rowToBytes(row);
-    }
-
-    /**
-     * Constructor for remove operation.
-     *
-     * @param commitReplicationGroupId Committed table partition id.
-     * @param rowId Row id.
-     * @param txId Transaction id.
-     */
-    public UpdateCommand(@NotNull TablePartitionId commitReplicationGroupId, 
@NotNull RowId rowId, @NotNull UUID txId) {
-        this(commitReplicationGroupId, rowId, null, txId);
-    }
-
-    /**
-     * Gets a table partition id that the commit partition.
-     *
-     * @return Table partition id.
-     */
-    public TablePartitionId getCommitReplicationGroupId() {
-        return commitReplicationGroupId;
-    }
-
-    /**
-     * Gets a row id that will be update.
-     *
-     * @return Row id.
-     */
-    public RowId getRowId() {
-        return rowId;
-    }
-
-    /**
-     * Gets a binary row.
-     *
-     * @return Binary key.
-     */
-    public BinaryRow getRow() {
-        if (row == null && rowBytes != null) {
-            row = new ByteBufferRow(rowBytes);
-        }
-
-        return row;
-    }
+    @Marshallable
+    ByteString rowBuffer();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index f25117ec10..5bb70b9787 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -34,15 +34,18 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.RowId;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
@@ -57,6 +60,7 @@ import org.apache.ignite.raft.client.WriteCommand;
 import org.apache.ignite.raft.client.service.CommandClosure;
 import org.apache.ignite.raft.client.service.CommittedConfiguration;
 import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.jraft.util.ByteString;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -78,6 +82,9 @@ public class PartitionListener implements RaftGroupListener {
 
     private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
 
+    /** Partition ID. */
+    private int partitionId;
+
     /** Rows that were inserted, updated or removed. */
     private final HashMap<UUID, Set<RowId>> txsPendingRowIds = new HashMap<>();
 
@@ -86,17 +93,20 @@ public class PartitionListener implements RaftGroupListener 
{
      *
      * @param partitionDataStorage  The storage.
      * @param txManager Transaction manager.
+     * @param partitionId Partition ID this listener serves.
      */
     public PartitionListener(
             PartitionDataStorage partitionDataStorage,
             TxStateStorage txStateStorage,
             TxManager txManager,
-            Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes
+            Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes,
+            int partitionId
     ) {
         this.storage = partitionDataStorage;
         this.txStateStorage = txStateStorage;
         this.txManager = txManager;
         this.indexes = indexes;
+        this.partitionId = partitionId;
     }
 
     @Override
@@ -174,11 +184,12 @@ public class PartitionListener implements 
RaftGroupListener {
         }
 
         storage.runConsistently(() -> {
-            BinaryRow row = cmd.getRow();
-            RowId rowId = cmd.getRowId();
+            BinaryRow row = cmd.rowBuffer() != null ? new 
ByteBufferRow(cmd.rowBuffer().toByteArray()) : null;
+            UUID rowUuid = cmd.rowUuid();
+            RowId rowId = new RowId(partitionId, rowUuid);
             UUID txId = cmd.txId();
-            UUID commitTblId = cmd.getCommitReplicationGroupId().getTableId();
-            int commitPartId = cmd.getCommitReplicationGroupId().getPartId();
+            UUID commitTblId = cmd.tablePartitionId().tableId();
+            int commitPartId = cmd.tablePartitionId().partitionId();
 
             storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
 
@@ -207,14 +218,14 @@ public class PartitionListener implements 
RaftGroupListener {
 
         storage.runConsistently(() -> {
             UUID txId = cmd.txId();
-            Map<RowId, BinaryRow> rowsToUpdate = cmd.getRowsToUpdate();
-            UUID commitTblId = cmd.getReplicationGroupId().getTableId();
-            int commitPartId = cmd.getReplicationGroupId().getPartId();
+            Map<UUID, ByteString> rowsToUpdate = cmd.rowsToUpdate();
+            UUID commitTblId = cmd.tablePartitionId().tableId();
+            int commitPartId = cmd.tablePartitionId().partitionId();
 
             if (!nullOrEmpty(rowsToUpdate)) {
-                for (Map.Entry<RowId, BinaryRow> entry : 
rowsToUpdate.entrySet()) {
-                    RowId rowId = entry.getKey();
-                    BinaryRow row = entry.getValue();
+                for (Map.Entry<UUID, ByteString> entry : 
rowsToUpdate.entrySet()) {
+                    RowId rowId = new RowId(partitionId, entry.getKey());
+                    BinaryRow row = entry.getValue() != null ? new 
ByteBufferRow(entry.getValue().toByteArray()) : null;
 
                     storage.addWrite(rowId, row, txId, commitTblId, 
commitPartId);
 
@@ -246,12 +257,15 @@ public class PartitionListener implements 
RaftGroupListener {
 
         UUID txId = cmd.txId();
 
-        TxState stateToSet = cmd.commit() ? TxState.COMMITED : TxState.ABORTED;
+        TxState stateToSet = cmd.commit() ? COMMITED : ABORTED;
 
         TxMeta txMetaToSet = new TxMeta(
                 stateToSet,
-                cmd.replicationGroupIds(),
-                cmd.commitTimestamp()
+                cmd.tablePartitionIds()
+                        .stream()
+                        .map(TablePartitionIdMessage::asTablePartitionId)
+                        .collect(Collectors.toList()),
+                cmd.commitTimestamp() != null ? 
cmd.commitTimestamp().asHybridTimestamp() : null
         );
 
         TxMeta txMetaBeforeCas = txStateStorage.get(txId);
@@ -305,7 +319,7 @@ public class PartitionListener implements RaftGroupListener 
{
             Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, 
Collections.emptySet());
 
             if (cmd.commit()) {
-                pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, 
cmd.commitTimestamp()));
+                pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, 
cmd.commitTimestamp().asHybridTimestamp()));
             } else {
                 pendingRowIds.forEach(storage::abortWrite);
             }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 136f79482f..6d6a9bf306 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -24,11 +24,14 @@ import static 
java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -65,11 +68,15 @@ import 
org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
-import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
+import 
org.apache.ignite.internal.table.distributed.command.FinishTxCommandBuilder;
+import 
org.apache.ignite.internal.table.distributed.command.HybridTimestampMessage;
+import 
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import 
org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
@@ -103,11 +110,15 @@ import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.util.ByteString;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /** Partition replication listener. */
 public class PartitionReplicaListener implements ReplicaListener {
+    /** Factory to create RAFT command messages. */
+    private final TableMessagesFactory msgFactory = new TableMessagesFactory();
+
     /** Tx messages factory. */
     private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
 
@@ -923,7 +934,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     // TODO: need to properly handle primary replica changes 
https://issues.apache.org/jira/browse/IGNITE-17615
     private CompletableFuture<Void> 
processTxFinishAction(TxFinishReplicaRequest request) {
         List<ReplicationGroupId> aggregatedGroupIds = 
request.groups().values().stream()
-                
.flatMap(List::stream).map(IgniteBiTuple::get1).collect(Collectors.toList());
+                .flatMap(List::stream)
+                .map(IgniteBiTuple::get1)
+                .collect(Collectors.toList());
 
         UUID txId = request.txId();
 
@@ -967,14 +980,17 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         HybridTimestamp commitTimestamp = commit ? hybridClock.now() : null;
 
-        CompletableFuture<Object> changeStateFuture = raftClient.run(
-                new FinishTxCommand(
-                        txId,
-                        commit,
-                        commitTimestamp,
-                        aggregatedGroupIds
-                )
-        ).whenComplete((o, throwable) -> {
+        FinishTxCommandBuilder finishTxCmdBldr = msgFactory.finishTxCommand()
+                .txId(txId)
+                .commit(commit)
+                .tablePartitionIds(aggregatedGroupIds.stream()
+                        .map(rgId -> tablePartitionId((TablePartitionId) 
rgId)).collect(Collectors.toList()));
+
+        if (commit) {
+            finishTxCmdBldr.commitTimestamp(hybridTimestamp(commitTimestamp));
+        }
+
+        CompletableFuture<Object> changeStateFuture = 
raftClient.run(finishTxCmdBldr.build()).whenComplete((o, throwable) -> {
             fut.complete(new TxMeta(commit ? TxState.COMMITED : 
TxState.ABORTED, aggregatedGroupIds, commitTimestamp));
 
             txTimestampUpdateMap.remove(txId);
@@ -1004,8 +1020,16 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return failedFuture(e);
         }
 
+        HybridTimestampMessage timestampMsg = 
hybridTimestamp(request.commitTimestamp());
+
+        TxCleanupCommand txCleanupCmd = msgFactory.txCleanupCommand()
+                .txId(request.txId())
+                .commit(request.commit())
+                .commitTimestamp(timestampMsg)
+                .build();
+
         return raftClient
-                .run(new TxCleanupCommand(request.txId(), request.commit(), 
request.commitTimestamp()))
+                .run(txCleanupCmd)
                 .thenRun(() -> 
lockManager.locks(request.txId()).forEachRemaining(lockManager::release));
     }
 
@@ -1154,7 +1178,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 }
 
                 return allOf(rowIdLockFuts).thenCompose(ignore -> {
-                    Collection<RowId> rowIdsToDelete = new ArrayList<>();
+                    Map<UUID, ByteString> rowIdsToDelete = new HashMap<>();
                     Collection<BinaryRow> result = new ArrayList<>();
 
                     int futNum = 0;
@@ -1163,7 +1187,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         RowId lockedRowId = rowIdLockFuts[futNum++].join();
 
                         if (lockedRowId != null) {
-                            rowIdsToDelete.add(lockedRowId);
+                            rowIdsToDelete.put(lockedRowId.uuid(), null);
                         } else {
                             result.add(row);
                         }
@@ -1173,7 +1197,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         return completedFuture(result);
                     }
 
-                    return applyCmdWithExceptionHandling(new 
UpdateAllCommand(committedPartitionId, rowIdsToDelete, txId))
+                    return 
applyCmdWithExceptionHandling(updateAllCommand(committedPartitionId, 
rowIdsToDelete, txId))
                             .thenApply(ignored -> result);
                 });
             }
@@ -1193,7 +1217,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 }
 
                 return allOf(deleteExactLockFuts).thenCompose(ignore -> {
-                    Collection<RowId> rowIdsToDelete = new ArrayList<>();
+                    Map<UUID, ByteString> rowIdsToDelete = new HashMap<>();
                     Collection<BinaryRow> result = new ArrayList<>();
 
                     int futNum = 0;
@@ -1202,14 +1226,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         RowId lockedRowId = 
deleteExactLockFuts[futNum++].join();
 
                         if (lockedRowId != null) {
-                            rowIdsToDelete.add(lockedRowId);
+                            rowIdsToDelete.put(lockedRowId.uuid(), null);
                         } else {
                             result.add(row);
                         }
                     }
 
-                    CompletableFuture raftFut = rowIdsToDelete.isEmpty() ? 
completedFuture(null)
-                            : applyCmdWithExceptionHandling(new 
UpdateAllCommand(committedPartitionId, rowIdsToDelete, txId));
+                    CompletableFuture<Object> raftFut = 
rowIdsToDelete.isEmpty() ? completedFuture(null)
+                            : 
applyCmdWithExceptionHandling(updateAllCommand(committedPartitionId, 
rowIdsToDelete, txId));
 
                     return raftFut.thenApply(ignored -> result);
                 });
@@ -1227,6 +1251,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 return allOf(pkReadLockFuts).thenCompose(ignore -> {
                     Collection<BinaryRow> result = new ArrayList<>();
                     Map<RowId, BinaryRow> rowsToInsert = new HashMap<>();
+                    Set<ByteBuffer> uniqueKeys = new HashSet<>();
 
                     int futNum = 0;
 
@@ -1236,7 +1261,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         if (lockedRow != null) {
                             result.add(row);
                         } else {
-                            if (rowsToInsert.values().stream().noneMatch(row0 
-> row0.keySlice().equals(row.keySlice()))) {
+                            ByteBuffer keyToCheck = row.keySlice();
+                            if (uniqueKeys.add(keyToCheck)) {
                                 rowsToInsert.put(new RowId(partId), row);
                             } else {
                                 result.add(row);
@@ -1256,10 +1282,14 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                         insertLockFuts[idx++] = 
takeLocksForInsert(entry.getValue(), entry.getKey(), txId);
                     }
 
+                    Map<UUID, ByteString> convertedMap = 
rowsToInsert.entrySet().stream().collect(
+                            Collectors.toMap(
+                                    e -> e.getKey().uuid(),
+                                    e -> new 
ByteString(e.getValue().byteBuffer())));
+
                     return allOf(insertLockFuts)
                             .thenCompose(ignored -> 
applyCmdWithExceptionHandling(
-                                    new UpdateAllCommand(committedPartitionId, 
rowsToInsert, txId)))
-                            .thenApply(ignored -> result);
+                                    updateAllCommand(committedPartitionId, 
convertedMap, txId))).thenApply(ignored -> result);
                 });
             }
             case RW_UPSERT_ALL: {
@@ -1280,21 +1310,21 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 }
 
                 return allOf(rowIdFuts).thenCompose(ignore -> {
-                    Map<RowId, BinaryRow> rowsToUpdate = new HashMap<>();
+                    Map<UUID, ByteString> rowsToUpdate = new HashMap<>();
 
                     int futNum = 0;
 
                     for (BinaryRow row : request.binaryRows()) {
                         RowId lockedRow = rowIdFuts[futNum++].join();
 
-                        rowsToUpdate.put(lockedRow, row);
+                        rowsToUpdate.put(lockedRow.uuid(), new 
ByteString(row.byteBuffer()));
                     }
 
                     if (rowsToUpdate.isEmpty()) {
                         return completedFuture(null);
                     }
 
-                    return applyCmdWithExceptionHandling(new 
UpdateAllCommand(committedPartitionId, rowsToUpdate, txId))
+                    return 
applyCmdWithExceptionHandling(updateAllCommand(committedPartitionId, 
rowsToUpdate, txId))
                             .thenApply(ignored -> null);
                 });
             }
@@ -1361,7 +1391,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     return takeLocksForDelete(searchRow, rowId, txId)
-                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, rowId, 
txId)))
+                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(
+                                    updateCommand(commitPartitionId, 
rowId.uuid(), null, txId)))
                             .thenApply(ignored -> true);
                 });
             }
@@ -1372,7 +1403,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
 
                     return takeLocksForDelete(searchRow, rowId, txId)
-                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, rowId, 
txId)))
+                            .thenCompose(ignored -> 
applyCmdWithExceptionHandling(
+                                    updateCommand(commitPartitionId, 
rowId.uuid(), null, txId)))
                             .thenApply(ignored -> row);
                 });
             }
@@ -1388,7 +1420,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                     return completedFuture(false);
                                 }
 
-                                return applyCmdWithExceptionHandling(new 
UpdateCommand(commitPartitionId, validatedRowId, txId))
+                                return applyCmdWithExceptionHandling(
+                                        updateCommand(commitPartitionId, 
validatedRowId.uuid(), null, txId))
                                         .thenApply(ignored -> true);
                             });
                 });
@@ -1403,7 +1436,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForInsert(searchRow, rowId0, txId)
                             .thenCompose(ignored -> 
applyCmdWithExceptionHandling(
-                                    new UpdateCommand(commitPartitionId, 
rowId0, searchRow, txId)))
+                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow.byteBuffer(), txId)))
                             .thenApply(ignored -> true);
                 });
             }
@@ -1419,7 +1452,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return lockFut
                             .thenCompose(ignored -> 
applyCmdWithExceptionHandling(
-                                    new UpdateCommand(commitPartitionId, 
rowId0, searchRow, txId)))
+                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow.byteBuffer(), txId)))
                             .thenApply(ignored -> null);
                 });
             }
@@ -1435,7 +1468,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return lockFut
                             .thenCompose(ignored -> 
applyCmdWithExceptionHandling(
-                                    new UpdateCommand(commitPartitionId, 
rowId0, searchRow, txId)))
+                                    updateCommand(commitPartitionId, 
rowId0.uuid(), searchRow.byteBuffer(), txId)))
                             .thenApply(ignored -> row);
                 });
             }
@@ -1447,7 +1480,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
                             .thenCompose(ignored -> 
applyCmdWithExceptionHandling(
-                                    new UpdateCommand(commitPartitionId, 
rowId, searchRow, txId)))
+                                    updateCommand(commitPartitionId, 
rowId.uuid(), searchRow.byteBuffer(), txId)))
                             .thenApply(ignored0 -> row);
                 });
             }
@@ -1459,7 +1492,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
                             .thenCompose(ignored -> 
applyCmdWithExceptionHandling(
-                                    new UpdateCommand(commitPartitionId, 
rowId, searchRow, txId)))
+                                    updateCommand(commitPartitionId, 
rowId.uuid(), searchRow.byteBuffer(), txId)))
                             .thenApply(ignored -> true);
                 });
             }
@@ -1602,7 +1635,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                 return completedFuture(false);
                             }
 
-                            return applyCmdWithExceptionHandling(new 
UpdateCommand(commitPartitionId, validatedRowId, newRow, txId))
+                            return applyCmdWithExceptionHandling(
+                                    updateCommand(commitPartitionId, 
validatedRowId.uuid(), newRow.byteBuffer(), txId))
                                     .thenApply(ignored -> true);
                         });
             });
@@ -1789,4 +1823,80 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     }
                 });
     }
+
+    /**
+     * Compounds a RAFT group unique name.
+     *
+     * @param tblId Table identifier.
+     * @param partition Number of table partitions.
+     * @return A RAFT group name.
+     */
+    private String partitionRaftGroupName(UUID tblId, int partition) {
+        return tblId + "_part_" + partition;
+    }
+
+    /**
+     * Method to convert from {@link HybridTimestamp} object to 
NetworkMessage-based {@link HybridTimestampMessage} object.
+     *
+     * @param tmstmp {@link HybridTimestamp} object to convert to {@link 
HybridTimestampMessage}.
+     * @return {@link HybridTimestampMessage} object obtained from {@link 
HybridTimestamp}.
+     */
+    private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) {
+        return tmstmp != null ? msgFactory.hybridTimestampMessage()
+                .physical(tmstmp.getPhysical())
+                .logical(tmstmp.getLogical())
+                .build()
+                : null;
+    }
+
+    /**
+     * Method to construct {@link UpdateCommand} object.
+     *
+     * @param tablePartId {@link TablePartitionId} object to construct {@link 
UpdateCommand} object with.
+     * @param rowUuid Row UUID.
+     * @param rowBuf {@link ByteBuffer} representation of {@link BinaryRow}.
+     * @param txId Transaction ID.
+     * @return Constructed {@link UpdateCommand} object.
+     */
+    private UpdateCommand updateCommand(TablePartitionId tablePartId, UUID 
rowUuid, ByteBuffer rowBuf, UUID txId) {
+        UpdateCommandBuilder bldr = msgFactory.updateCommand()
+                .tablePartitionId(tablePartitionId(tablePartId))
+                .rowUuid(rowUuid)
+                .txId(txId);
+
+        if (rowBuf != null) {
+            bldr.rowBuffer(new ByteString(rowBuf));
+        }
+
+        return bldr.build();
+    }
+
+    /**
+     * Method to construct {@link UpdateAllCommand} object.
+     *
+     * @param tablePartId {@link TablePartitionId} object to construct {@link 
UpdateCommand} object with.
+     * @param rowsToUpdate All {@link BinaryRow}s represented as {@link 
ByteString}s to be updated.
+     * @param txId Transaction ID.
+     * @return Constructed {@link UpdateAllCommand} object.
+     */
+    private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, 
Map<UUID, ByteString> rowsToUpdate, UUID txId) {
+        return msgFactory.updateAllCommand()
+                .tablePartitionId(tablePartitionId(tablePartId))
+                .rowsToUpdate(rowsToUpdate)
+                .txId(txId)
+                .build();
+    }
+
+    /**
+     * Method to convert from {@link TablePartitionId} object to command-based 
{@link TablePartitionIdMessage} object.
+     *
+     * @param tablePartId {@link TablePartitionId} object to convert to {@link 
TablePartitionIdMessage}.
+     * @return {@link TablePartitionIdMessage} object converted from argument.
+     */
+    private TablePartitionIdMessage tablePartitionId(TablePartitionId 
tablePartId) {
+        return msgFactory.tablePartitionIdMessage()
+                .tableId(tablePartId.getTableId())
+                .partitionId(tablePartId.getPartId())
+                .build();
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index 8d818f7ac6..eb6f5cb9b2 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -35,18 +35,18 @@ import java.util.Objects;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
 import 
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
-import org.apache.ignite.internal.storage.RowId;
-import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.raft.jraft.util.ByteString;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
@@ -57,6 +57,9 @@ public class PartitionRaftCommandsSerializationTest extends 
IgniteAbstractTest {
     /** Key-value marshaller for tests. */
     protected static KvMarshaller<TestKey, TestValue> kvMarshaller;
 
+    /** Message factory to create messages - RAFT commands. */
+    private TableMessagesFactory msgFactory = new TableMessagesFactory();
+
     @BeforeAll
     static void beforeAll() {
         var marshallerFactory = new ReflectionMarshallerFactory();
@@ -74,72 +77,100 @@ public class PartitionRaftCommandsSerializationTest 
extends IgniteAbstractTest {
 
     @Test
     public void testUpdateCommand() throws Exception {
-        UpdateCommand cmd = new UpdateCommand(
-                new TablePartitionId(UUID.randomUUID(), 1),
-                new RowId(1),
-                binaryRow(1),
-                UUID.randomUUID()
-        );
+        UpdateCommand cmd = msgFactory.updateCommand()
+                .tablePartitionId(msgFactory.tablePartitionIdMessage()
+                        .tableId(UUID.randomUUID())
+                        .partitionId(1)
+                        .build()
+                )
+                .rowUuid(Timestamp.nextVersion().toUuid())
+                .rowBuffer(byteStrFromBinaryRow(1))
+                .txId(UUID.randomUUID())
+                .build();
 
         UpdateCommand readCmd = copyCommand(cmd);
 
         assertEquals(cmd.txId(), readCmd.txId());
-        assertEquals(cmd.getRowId(), readCmd.getRowId());
-        assertArrayEquals(cmd.getRow().bytes(), readCmd.getRow().bytes());
+        assertEquals(cmd.rowUuid(), readCmd.rowUuid());
+        assertArrayEquals(cmd.rowBuffer().toByteArray(), 
readCmd.rowBuffer().toByteArray());
     }
 
     @Test
     public void testRemoveCommand() throws Exception {
-        UpdateCommand cmd = new UpdateCommand(new 
TablePartitionId(UUID.randomUUID(), 1), new RowId(1), UUID.randomUUID());
+        UpdateCommand cmd = msgFactory.updateCommand()
+                .tablePartitionId(msgFactory.tablePartitionIdMessage()
+                        .tableId(UUID.randomUUID())
+                        .partitionId(1)
+                        .build()
+                )
+                .rowUuid(Timestamp.nextVersion().toUuid())
+                .txId(UUID.randomUUID())
+                .build();
 
         UpdateCommand readCmd = copyCommand(cmd);
 
         assertEquals(cmd.txId(), readCmd.txId());
-        assertEquals(cmd.getRowId(), readCmd.getRowId());
-        assertNull(readCmd.getRow());
+        assertEquals(cmd.rowUuid(), readCmd.rowUuid());
+        assertNull(readCmd.rowBuffer());
     }
 
     @Test
     public void testUpdateAllCommand() throws Exception {
-        HashMap<RowId, BinaryRow> rowsToUpdate = new HashMap();
+        HashMap<UUID, ByteString> rowsToUpdate = new HashMap();
 
         for (int i = 0; i < 10; i++) {
-            rowsToUpdate.put(new RowId(i), binaryRow(i));
+            rowsToUpdate.put(Timestamp.nextVersion().toUuid(), 
byteStrFromBinaryRow(i));
         }
 
-        var cmd = new UpdateAllCommand(new TablePartitionId(UUID.randomUUID(), 
1), rowsToUpdate, UUID.randomUUID());
+        var cmd = msgFactory.updateAllCommand()
+                .tablePartitionId(msgFactory.tablePartitionIdMessage()
+                        .tableId(UUID.randomUUID())
+                        .partitionId(1)
+                        .build()
+                )
+                .rowsToUpdate(rowsToUpdate)
+                .txId(UUID.randomUUID())
+                .build();
 
         UpdateAllCommand readCmd = copyCommand(cmd);
 
         assertEquals(cmd.txId(), readCmd.txId());
 
-        for (Map.Entry<RowId, BinaryRow> entry : 
cmd.getRowsToUpdate().entrySet()) {
-            assertTrue(readCmd.getRowsToUpdate().containsKey(entry.getKey()));
+        for (Map.Entry<UUID, ByteString> entry : 
cmd.rowsToUpdate().entrySet()) {
+            assertTrue(readCmd.rowsToUpdate().containsKey(entry.getKey()));
 
-            var readVal = readCmd.getRowsToUpdate().get(entry.getKey());
+            var readVal = readCmd.rowsToUpdate().get(entry.getKey());
             var val = entry.getValue();
 
-            assertArrayEquals(val.bytes(), readVal.bytes());
+            assertArrayEquals(val.toByteArray(), readVal.toByteArray());
         }
     }
 
     @Test
     public void testRemoveAllCommand() throws Exception {
-        ArrayList<RowId> rowsToRemove = new ArrayList<>();
+        Map<UUID, ByteString> rowsToRemove = new HashMap<>();
 
         for (int i = 0; i < 10; i++) {
-            rowsToRemove.add(new RowId(i));
+            rowsToRemove.put(Timestamp.nextVersion().toUuid(), null);
         }
 
-        var cmd = new UpdateAllCommand(new TablePartitionId(UUID.randomUUID(), 
1), rowsToRemove, UUID.randomUUID());
+        var cmd = msgFactory.updateAllCommand()
+                .tablePartitionId(msgFactory.tablePartitionIdMessage()
+                        .tableId(UUID.randomUUID())
+                        .partitionId(1)
+                        .build()
+                )
+                .rowsToUpdate(rowsToRemove)
+                .txId(UUID.randomUUID())
+                .build();
 
         UpdateAllCommand readCmd = copyCommand(cmd);
 
         assertEquals(cmd.txId(), readCmd.txId());
 
-        for (RowId rowId : cmd.getRowsToUpdate().keySet()) {
-            assertTrue(readCmd.getRowsToUpdate().containsKey(rowId));
-            assertNull(readCmd.getRowsToUpdate().get(rowId));
+        for (UUID uuid : cmd.rowsToUpdate().keySet()) {
+            assertTrue(readCmd.rowsToUpdate().containsKey(uuid));
+            assertNull(readCmd.rowsToUpdate().get(uuid));
         }
     }
 
@@ -147,7 +178,11 @@ public class PartitionRaftCommandsSerializationTest 
extends IgniteAbstractTest {
     public void testTxCleanupCommand() throws Exception {
         HybridClock clock = new HybridClockImpl();
 
-        TxCleanupCommand cmd = new TxCleanupCommand(UUID.randomUUID(), true, 
clock.now());
+        TxCleanupCommand cmd = msgFactory.txCleanupCommand()
+                .txId(UUID.randomUUID())
+                .commit(true)
+                .commitTimestamp(hybridTimestampMessage(clock.now()))
+                .build();
 
         TxCleanupCommand readCmd = copyCommand(cmd);
 
@@ -159,20 +194,35 @@ public class PartitionRaftCommandsSerializationTest 
extends IgniteAbstractTest {
     @Test
     public void testFinishTxCommand() throws Exception {
         HybridClock clock = new HybridClockImpl();
-        ArrayList<ReplicationGroupId> grps = new ArrayList<>(10);
+        ArrayList<TablePartitionIdMessage> grps = new ArrayList<>(10);
 
         for (int i = 0; i < 10; i++) {
-            grps.add(new TablePartitionId(UUID.randomUUID(), i));
+            grps.add(msgFactory.tablePartitionIdMessage()
+                    .tableId(UUID.randomUUID())
+                    .partitionId(i)
+                    .build());
         }
 
-        FinishTxCommand cmd = new FinishTxCommand(UUID.randomUUID(), true, 
clock.now(), grps);
+        FinishTxCommand cmd = msgFactory.finishTxCommand()
+                .txId(UUID.randomUUID())
+                .commit(true)
+                .commitTimestamp(hybridTimestampMessage(clock.now()))
+                .tablePartitionIds(grps)
+                .build();
 
         FinishTxCommand readCmd = copyCommand(cmd);
 
         assertEquals(cmd.txId(), readCmd.txId());
         assertEquals(cmd.commit(), readCmd.commit());
         assertEquals(cmd.commitTimestamp(), readCmd.commitTimestamp());
-        assertEquals(cmd.replicationGroupIds(), readCmd.replicationGroupIds());
+        assertEquals(cmd.tablePartitionIds(), readCmd.tablePartitionIds());
+    }
+
+    private HybridTimestampMessage hybridTimestampMessage(HybridTimestamp 
tmstmp) {
+        return msgFactory.hybridTimestampMessage()
+                .logical(tmstmp.getLogical())
+                .physical(tmstmp.getPhysical())
+                .build();
     }
 
     private <T> T copyCommand(T cmd) throws Exception {
@@ -199,8 +249,9 @@ public class PartitionRaftCommandsSerializationTest extends 
IgniteAbstractTest {
         }
     }
 
-    private static BinaryRow binaryRow(int id) throws Exception {
-        return kvMarshaller.marshal(new TestKey(id, String.valueOf(id)), new 
TestValue(id, String.valueOf(id)));
+    private static ByteString byteStrFromBinaryRow(int id) throws Exception {
+        return new ByteString(kvMarshaller.marshal(new TestKey(id, 
String.valueOf(id)), new TestValue(id, String.valueOf(id)))
+                .byteBuffer());
     }
 
     /**
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 8a56351b87..885f8df449 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -40,11 +40,9 @@ import java.io.Serializable;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
@@ -72,10 +70,10 @@ import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
 import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
-import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -94,6 +92,7 @@ import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.apache.ignite.raft.client.service.CommandClosure;
 import org.apache.ignite.raft.client.service.CommittedConfiguration;
+import org.apache.ignite.raft.jraft.util.ByteString;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -154,6 +153,9 @@ public class PartitionCommandListenerTest {
     @WorkDirectory
     private Path workDir;
 
+    /** Factory for command messages. */
+    private TableMessagesFactory msgFactory = new TableMessagesFactory();
+
     @Captor
     private ArgumentCaptor<Throwable> commandClosureResultCaptor;
 
@@ -174,7 +176,8 @@ public class PartitionCommandListenerTest {
                 partitionDataStorage,
                 txStateStorage,
                 new TxManagerImpl(replicaService, new HeapLockManager(), new 
HybridClockImpl()),
-                () -> Map.of(pkStorage.id(), pkStorage)
+                () -> Map.of(pkStorage.id(), pkStorage),
+                PARTITION_ID
         );
     }
 
@@ -262,7 +265,8 @@ public class PartitionCommandListenerTest {
                 partitionDataStorage,
                 txStateStorage,
                 new TxManagerImpl(replicaService, new HeapLockManager(), new 
HybridClockImpl()),
-                () -> Map.of(pkStorage.id(), pkStorage)
+                () -> Map.of(pkStorage.id(), pkStorage),
+                PARTITION_ID
         );
 
         txStateStorage.lastApplied(3L, 1L);
@@ -486,18 +490,35 @@ public class PartitionCommandListenerTest {
      * Inserts all rows.
      */
     private void insertAll() {
-        HashMap<RowId, BinaryRow> rows = new HashMap<>(KEY_COUNT);
+        HashMap<UUID, ByteString> rows = new HashMap<>(KEY_COUNT);
         UUID txId = Timestamp.nextVersion().toUuid();
         var commitPartId = new TablePartitionId(txId, PARTITION_ID);
 
         for (int i = 0; i < KEY_COUNT; i++) {
             Row row = getTestRow(i, i);
 
-            rows.put(new RowId(PARTITION_ID), row);
+            rows.put(Timestamp.nextVersion().toUuid(), new 
ByteString(row.byteBuffer()));
         }
 
-        invokeBatchedCommand(new UpdateAllCommand(commitPartId, rows, txId));
-        invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now()));
+        HybridTimestamp commitTimestamp = CLOCK.now();
+
+        invokeBatchedCommand(msgFactory.updateAllCommand()
+                .tablePartitionId(
+                        msgFactory.tablePartitionIdMessage()
+                                .tableId(commitPartId.getTableId())
+                                .partitionId(commitPartId.getPartId())
+                                .build())
+                .rowsToUpdate(rows)
+                .txId(txId)
+                .build());
+        invokeBatchedCommand(msgFactory.txCleanupCommand()
+                .txId(txId)
+                .commit(true)
+                .commitTimestamp(msgFactory.hybridTimestampMessage()
+                        .physical(commitTimestamp.getPhysical())
+                        .logical(commitTimestamp.getLogical())
+                        .build())
+                .build());
     }
 
     /**
@@ -508,16 +529,33 @@ public class PartitionCommandListenerTest {
     private void updateAll(Function<Integer, Integer> keyValueMapper) {
         UUID txId = Timestamp.nextVersion().toUuid();
         var commitPartId = new TablePartitionId(txId, PARTITION_ID);
-        HashMap<RowId, BinaryRow> rows = new HashMap<>(KEY_COUNT);
+        HashMap<UUID, ByteString> rows = new HashMap<>(KEY_COUNT);
 
         for (int i = 0; i < KEY_COUNT; i++) {
             Row row = getTestRow(i, keyValueMapper.apply(i));
 
-            rows.put(readRow(row), row);
+            rows.put(readRow(row).uuid(), new ByteString(row.byteBuffer()));
         }
 
-        invokeBatchedCommand(new UpdateAllCommand(commitPartId, rows, txId));
-        invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now()));
+        HybridTimestamp commitTimestamp = CLOCK.now();
+
+        invokeBatchedCommand(msgFactory.updateAllCommand()
+                .tablePartitionId(
+                        msgFactory.tablePartitionIdMessage()
+                                .tableId(commitPartId.getTableId())
+                                .partitionId(commitPartId.getPartId())
+                                .build())
+                .rowsToUpdate(rows)
+                .txId(txId)
+                .build());
+        invokeBatchedCommand(msgFactory.txCleanupCommand()
+                .txId(txId)
+                .commit(true)
+                .commitTimestamp(msgFactory.hybridTimestampMessage()
+                        .physical(commitTimestamp.getPhysical())
+                        .logical(commitTimestamp.getLogical())
+                        .build())
+                .build());
     }
 
     /**
@@ -526,16 +564,33 @@ public class PartitionCommandListenerTest {
     private void deleteAll() {
         UUID txId = Timestamp.nextVersion().toUuid();
         var commitPartId = new TablePartitionId(txId, PARTITION_ID);
-        Set<RowId> keyRows = new HashSet<>(KEY_COUNT);
+        Map<UUID, ByteString> keyRows = new HashMap<>(KEY_COUNT);
 
         for (int i = 0; i < KEY_COUNT; i++) {
             Row row = getTestRow(i, i);
 
-            keyRows.add(readRow(row));
+            keyRows.put(readRow(row).uuid(), null);
         }
 
-        invokeBatchedCommand(new UpdateAllCommand(commitPartId, keyRows, 
txId));
-        invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now()));
+        HybridTimestamp commitTimestamp = CLOCK.now();
+
+        invokeBatchedCommand(msgFactory.updateAllCommand()
+                .tablePartitionId(
+                        msgFactory.tablePartitionIdMessage()
+                                .tableId(commitPartId.getTableId())
+                                .partitionId(commitPartId.getPartId())
+                                .build())
+                .rowsToUpdate(keyRows)
+                .txId(txId)
+                .build());
+        invokeBatchedCommand(msgFactory.txCleanupCommand()
+                .txId(txId)
+                .commit(true)
+                .commitTimestamp(msgFactory.hybridTimestampMessage()
+                        .physical(commitTimestamp.getPhysical())
+                        .logical(commitTimestamp.getLogical())
+                        .build())
+                .build());
     }
 
     /**
@@ -550,7 +605,6 @@ public class PartitionCommandListenerTest {
             UUID txId = Timestamp.nextVersion().toUuid();
             Row row = getTestRow(i, keyValueMapper.apply(i));
             RowId rowId = readRow(row);
-            var commitPartId = new TablePartitionId(txId, PARTITION_ID);
 
             assertNotNull(rowId);
 
@@ -558,7 +612,15 @@ public class PartitionCommandListenerTest {
 
             when(clo.index()).thenReturn(raftIndex.incrementAndGet());
 
-            when(clo.command()).thenReturn(new UpdateCommand(commitPartId, 
rowId, row, txId));
+            when(clo.command()).thenReturn(
+                    msgFactory.updateCommand()
+                            
.tablePartitionId(msgFactory.tablePartitionIdMessage()
+                                    .tableId(txId)
+                                    .partitionId(PARTITION_ID).build())
+                            .rowUuid(new UUID(rowId.mostSignificantBits(), 
rowId.leastSignificantBits()))
+                            .rowBuffer(new ByteString(row.byteBuffer()))
+                            .txId(txId)
+                            .build());
 
             doAnswer(invocation -> {
                 assertNull(invocation.getArgument(0));
@@ -567,7 +629,16 @@ public class PartitionCommandListenerTest {
             }).when(clo).result(any());
         }));
 
-        txIds.forEach(txId -> invokeBatchedCommand(new TxCleanupCommand(txId, 
true, CLOCK.now())));
+        HybridTimestamp commitTimestamp = CLOCK.now();
+
+        txIds.forEach(txId -> 
invokeBatchedCommand(msgFactory.txCleanupCommand()
+                .txId(txId)
+                .commit(true)
+                .commitTimestamp(msgFactory.hybridTimestampMessage()
+                        .physical(commitTimestamp.getPhysical())
+                        .logical(commitTimestamp.getLogical())
+                        .build())
+                .build()));
     }
 
     /**
@@ -580,7 +651,6 @@ public class PartitionCommandListenerTest {
             UUID txId = Timestamp.nextVersion().toUuid();
             Row row = getTestRow(i, i);
             RowId rowId = readRow(row);
-            var commitPartId = new TablePartitionId(txId, PARTITION_ID);
 
             assertNotNull(rowId);
 
@@ -588,7 +658,14 @@ public class PartitionCommandListenerTest {
 
             when(clo.index()).thenReturn(raftIndex.incrementAndGet());
 
-            when(clo.command()).thenReturn(new UpdateCommand(commitPartId, 
rowId, txId));
+            when(clo.command()).thenReturn(
+                    msgFactory.updateCommand()
+                            
.tablePartitionId(msgFactory.tablePartitionIdMessage()
+                                    .tableId(txId)
+                                    .partitionId(PARTITION_ID).build())
+                            .rowUuid(new UUID(rowId.mostSignificantBits(), 
rowId.leastSignificantBits()))
+                            .txId(txId)
+                            .build());
 
             doAnswer(invocation -> {
                 assertNull(invocation.getArgument(0));
@@ -597,7 +674,16 @@ public class PartitionCommandListenerTest {
             }).when(clo).result(any());
         }));
 
-        txIds.forEach(txId -> invokeBatchedCommand(new TxCleanupCommand(txId, 
true, CLOCK.now())));
+        HybridTimestamp commitTimestamp = CLOCK.now();
+
+        txIds.forEach(txId -> 
invokeBatchedCommand(msgFactory.txCleanupCommand()
+                .txId(txId)
+                .commit(true)
+                .commitTimestamp(msgFactory.hybridTimestampMessage()
+                        .physical(commitTimestamp.getPhysical())
+                        .logical(commitTimestamp.getLogical())
+                        .build())
+                .build()));
     }
 
     /**
@@ -648,7 +734,15 @@ public class PartitionCommandListenerTest {
 
             when(clo.index()).thenReturn(raftIndex.incrementAndGet());
 
-            when(clo.command()).thenReturn(new UpdateCommand(commitPartId, new 
RowId(PARTITION_ID), row, txId));
+            when(clo.command()).thenReturn(
+                    msgFactory.updateCommand()
+                            
.tablePartitionId(msgFactory.tablePartitionIdMessage()
+                                    .tableId(txId)
+                                    .partitionId(PARTITION_ID).build())
+                            .rowUuid(Timestamp.nextVersion().toUuid())
+                            .rowBuffer(new ByteString(row.byteBuffer()))
+                            .txId(txId)
+                            .build());
 
             doAnswer(invocation -> {
                 assertNull(invocation.getArgument(0));
@@ -657,7 +751,19 @@ public class PartitionCommandListenerTest {
             }).when(clo).result(any());
         }));
 
-        txIds.forEach(txId -> invokeBatchedCommand(new TxCleanupCommand(txId, 
true, CLOCK.now())));
+        HybridTimestamp now = CLOCK.now();
+
+
+
+        txIds.forEach(txId -> invokeBatchedCommand(
+                msgFactory.txCleanupCommand()
+                        .txId(txId)
+                        .commit(true)
+                        .commitTimestamp(msgFactory.hybridTimestampMessage()
+                                .physical(now.getPhysical())
+                                .logical(now.getLogical())
+                                .build())
+                        .build()));
     }
 
     /**
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index f25de52421..7e2e6bbc18 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -261,7 +261,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 new TestPartitionDataStorage(mvPartStorage),
                 txStateStorage().getOrCreateTxStateStorage(0),
                 this.txManager,
-                () -> Map.of(pkStorage.get().id(), pkStorage.get())
+                () -> Map.of(pkStorage.get().id(), pkStorage.get()),
+                0
         );
     }
 

Reply via email to