This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 51d25d217a IGNITE-17886 Table commands migrated to use NetworkMessage-based code generation infrastructure 51d25d217a is described below commit 51d25d217af468d9611ee2536d929a017506b9a6 Author: Sergey Chugunov <sergey.chugu...@gmail.com> AuthorDate: Fri Nov 18 17:19:21 2022 +0300 IGNITE-17886 Table commands migrated to use NetworkMessage-based code generation infrastructure Signed-off-by: Ivan Bessonov <bessonov...@gmail.com> --- .../ignite/distributed/ItTablePersistenceTest.java | 3 +- .../distributed/ItTxDistributedTestSingleNode.java | 3 +- .../ignite/internal/table/ItColocationTest.java | 49 ++++-- .../internal/table/distributed/TableManager.java | 6 +- .../table/distributed/TableMessageGroup.java | 29 ++++ .../table/distributed/command/FinishTxCommand.java | 56 +------ ...ionCommand.java => HybridTimestampMessage.java} | 34 ++-- .../distributed/command/PartitionCommand.java | 24 +-- ...onCommand.java => TablePartitionIdMessage.java} | 33 ++-- .../distributed/command/TxCleanupCommand.java | 42 +---- .../distributed/command/UpdateAllCommand.java | 106 ++----------- .../table/distributed/command/UpdateCommand.java | 92 ++--------- .../table/distributed/raft/PartitionListener.java | 44 ++++-- .../replicator/PartitionReplicaListener.java | 176 +++++++++++++++++---- .../PartitionRaftCommandsSerializationTest.java | 121 ++++++++++---- .../raft/PartitionCommandListenerTest.java | 156 +++++++++++++++--- .../table/impl/DummyInternalTableImpl.java | 3 +- 17 files changed, 522 insertions(+), 455 deletions(-) diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java index d74a09714d..6a4601d466 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java @@ -274,7 +274,8 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti new TestPartitionDataStorage(testMpPartStorage), new TestTxStateStorage(), txManager, - Map::of + Map::of, + 0 ); paths.put(listener, workDir); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java index 50c6109df6..efe23949f4 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java @@ -432,7 +432,8 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest { new TestPartitionDataStorage(testMpPartStorage), new TestTxStateStorage(), txManagers.get(node), - () -> Map.of(pkStorage.get().id(), pkStorage.get()) + () -> Map.of(pkStorage.get().id(), pkStorage.get()), + partId ); }, RaftGroupOptions.defaults() diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index 23312776c4..5cf66004ef 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -56,8 +56,8 @@ import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.message.ReplicaRequest; -import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; +import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.NativeType; import org.apache.ignite.internal.schema.NativeTypeSpec; @@ -67,8 +67,8 @@ import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException; import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl; import org.apache.ignite.internal.schema.row.Row; -import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.engine.MvTableStorage; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse; @@ -78,6 +78,7 @@ import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; +import org.apache.ignite.internal.tx.Timestamp; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.tx.impl.TxManagerImpl; @@ -90,6 +91,7 @@ import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.client.Command; import org.apache.ignite.raft.client.Peer; import org.apache.ignite.raft.client.service.RaftGroupService; +import org.apache.ignite.raft.jraft.util.ByteString; import org.apache.ignite.table.Tuple; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; @@ -113,6 +115,9 @@ public class ItColocationTest { /** Map of the Raft commands are set by table operation. */ private static final Int2ObjectMap<Set<Command>> CMDS_MAP = new Int2ObjectOpenHashMap<>(); + /** Message factory to create messages - RAFT commands. */ + private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory(); + private SchemaDescriptor schema; private SchemaRegistry schemaRegistry; @@ -184,20 +189,34 @@ public class ItColocationTest { RaftGroupService r = groupRafts.get(request.groupId()); if (request instanceof ReadWriteMultiRowReplicaRequest) { - Map<RowId, BinaryRow> rows = ((ReadWriteMultiRowReplicaRequest) request).binaryRows() + Map<UUID, ByteString> rows = ((ReadWriteMultiRowReplicaRequest) request).binaryRows() .stream() - .collect(toMap(row -> new RowId(0), row -> row)); - - return r.run(new UpdateAllCommand(commitPartId, rows, UUID.randomUUID())); + .collect(toMap( + row -> Timestamp.nextVersion().toUuid(), row -> new ByteString(row.byteBuffer()))); + + return r.run(MSG_FACTORY.updateAllCommand() + .tablePartitionId(MSG_FACTORY.tablePartitionIdMessage() + .tableId(commitPartId.getTableId()) + .partitionId(commitPartId.getPartId()) + .build() + ) + .rowsToUpdate(rows) + .txId(UUID.randomUUID()) + .build()); } else { assertThat(request, is(instanceOf(ReadWriteSingleRowReplicaRequest.class))); - return r.run(new UpdateCommand( - commitPartId, - new RowId(0), - ((ReadWriteSingleRowReplicaRequest) request).binaryRow(), - UUID.randomUUID()) - ); + return r.run(MSG_FACTORY.updateCommand() + .tablePartitionId( + MSG_FACTORY.tablePartitionIdMessage() + .tableId(commitPartId.getTableId()) + .partitionId(commitPartId.getPartId()) + .build() + ) + .rowUuid(Timestamp.nextVersion().toUuid()) + .rowBuffer(new ByteString(((ReadWriteSingleRowReplicaRequest) request).binaryRow().byteBuffer())) + .txId(UUID.randomUUID()) + .build()); } }); @@ -366,10 +385,10 @@ public class ItColocationTest { CMDS_MAP.forEach((p, set) -> { UpdateAllCommand cmd = (UpdateAllCommand) CollectionUtils.first(set); - assertEquals(partsMap.get(p), cmd.getRowsToUpdate().size(), () -> "part=" + p + ", set=" + set); + assertEquals(partsMap.get(p), cmd.rowsToUpdate().size(), () -> "part=" + p + ", set=" + set); - cmd.getRowsToUpdate().values().forEach(binRow -> { - Row r = new Row(schema, binRow); + cmd.rowsToUpdate().values().forEach(byteStr -> { + Row r = new Row(schema, new ByteBufferRow(byteStr.toByteArray())); assertEquals(INT_TABLE.partition(r), p); }); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index ff0fd9a51d..e6f7d1663a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -776,7 +776,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp partitionDataStorage(mvPartitionStorage, internalTbl, partId), txStatePartitionStorage, txManager, - table.indexStorageAdapters(partId) + table.indexStorageAdapters(partId), + partId ), new RebalanceRaftGroupEventsListener( metaStorageMgr, @@ -1824,7 +1825,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp partitionDataStorage(mvPartitionStorage, internalTable, partId), txStatePartitionStorage, txManager, - tbl.indexStorageAdapters(partId) + tbl.indexStorageAdapters(partId), + partId ); RaftGroupEventsListener raftGrpEvtsLsnr = new RebalanceRaftGroupEventsListener( diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java index 3eb11ed25f..2a4f6f96c8 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java @@ -17,6 +17,12 @@ package org.apache.ignite.internal.table.distributed; +import org.apache.ignite.internal.table.distributed.command.FinishTxCommand; +import org.apache.ignite.internal.table.distributed.command.HybridTimestampMessage; +import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage; +import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; +import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand; +import org.apache.ignite.internal.table.distributed.command.UpdateCommand; import org.apache.ignite.internal.table.distributed.message.HasDataRequest; import org.apache.ignite.internal.table.distributed.message.HasDataResponse; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest; @@ -125,4 +131,27 @@ public interface TableMessageGroup { * Message type for {@link SnapshotTxDataResponse}. */ short SNAPSHOT_TX_DATA_RESPONSE = 16; + + /** + * Message types for Table module RAFT commands. + */ + public interface Commands { + /** Message type for {@link FinishTxCommand}. */ + short FINISH_TX = 40; + + /** Message type for {@link TxCleanupCommand}. */ + short TX_CLEANUP = 41; + + /** Message type for {@link UpdateAllCommand}. */ + short UPDATE_ALL = 42; + + /** Message type for {@link UpdateCommand}. */ + short UPDATE = 43; + + /** Message type for {@link HybridTimestampMessage}. */ + short HYBRID_TIMESTAMP = 60; + + /** Message type for {@link TablePartitionIdMessage}. */ + short TABLE_PARTITION_ID = 61; + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java index f640f2c33f..77a7f789e2 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java @@ -18,68 +18,26 @@ package org.apache.ignite.internal.table.distributed.command; import java.util.List; -import java.util.UUID; -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.network.annotations.Transferable; /** * State machine command to finish a transaction on a commit or a rollback. */ -public class FinishTxCommand extends PartitionCommand { - /** - * A commit or a rollback state. - */ - private final boolean commit; - - /** - * Transaction commit timestamp. - */ - private final HybridTimestamp commitTimestamp; - - /** - * Replication groups ids. - */ - private final List<ReplicationGroupId> replicationGroupIds; - - /** - * The constructor. - * - * @param txId The txId. - * @param commit Commit or rollback state {@code True} to commit. - * @param commitTimestamp Transaction commit timestamp. - * @param replicationGroupIds Set of replication groups ids. - */ - public FinishTxCommand(UUID txId, boolean commit, HybridTimestamp commitTimestamp, List<ReplicationGroupId> replicationGroupIds) { - super(txId); - this.commit = commit; - this.commitTimestamp = commitTimestamp; - this.replicationGroupIds = replicationGroupIds; - } - +@Transferable(TableMessageGroup.Commands.FINISH_TX) +public interface FinishTxCommand extends PartitionCommand { /** * Returns a commit or a rollback state. - * - * @return A commit or a rollback state. */ - public boolean commit() { - return commit; - } + boolean commit(); /** * Returns a transaction commit timestamp. - * - * @return A transaction commit timestamp. */ - public HybridTimestamp commitTimestamp() { - return commitTimestamp; - } + HybridTimestampMessage commitTimestamp(); /** * Returns an ordered replication groups ids. - * - * @return An ordered replication groups ids. */ - public List<ReplicationGroupId> replicationGroupIds() { - return replicationGroupIds; - } + List<TablePartitionIdMessage> tablePartitionIds(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/HybridTimestampMessage.java similarity index 58% copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/HybridTimestampMessage.java index a039fe3d0c..b69545d1c5 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/HybridTimestampMessage.java @@ -17,32 +17,22 @@ package org.apache.ignite.internal.table.distributed.command; -import java.util.UUID; -import org.apache.ignite.raft.client.WriteCommand; -import org.jetbrains.annotations.NotNull; +import java.io.Serializable; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.annotations.Transferable; /** - * Partition transactional command. + * Interface to represent {@link HybridTimestamp} as a {@link NetworkMessage}. */ -public abstract class PartitionCommand implements WriteCommand { - /** Transaction id. */ - private UUID txId; +@Transferable(TableMessageGroup.Commands.HYBRID_TIMESTAMP) +public interface HybridTimestampMessage extends NetworkMessage, Serializable { + long physical(); - /** - * The constructor. - * - * @param txId Transaction id. - */ - public PartitionCommand(@NotNull UUID txId) { - this.txId = txId; - } + int logical(); - /** - * Gets a transaction id. - * - * @return Transaction id. - */ - public UUID txId() { - return txId; + default HybridTimestamp asHybridTimestamp() { + return new HybridTimestamp(physical(), logical()); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java index a039fe3d0c..555f3f4f6b 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java @@ -18,31 +18,15 @@ package org.apache.ignite.internal.table.distributed.command; import java.util.UUID; +import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.raft.client.WriteCommand; -import org.jetbrains.annotations.NotNull; /** * Partition transactional command. */ -public abstract class PartitionCommand implements WriteCommand { - /** Transaction id. */ - private UUID txId; - - /** - * The constructor. - * - * @param txId Transaction id. - */ - public PartitionCommand(@NotNull UUID txId) { - this.txId = txId; - } - +public interface PartitionCommand extends WriteCommand, NetworkMessage { /** - * Gets a transaction id. - * - * @return Transaction id. + * Returns a transaction id. */ - public UUID txId() { - return txId; - } + UUID txId(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TablePartitionIdMessage.java similarity index 57% copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TablePartitionIdMessage.java index a039fe3d0c..9514efec93 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TablePartitionIdMessage.java @@ -17,32 +17,23 @@ package org.apache.ignite.internal.table.distributed.command; +import java.io.Serializable; import java.util.UUID; -import org.apache.ignite.raft.client.WriteCommand; -import org.jetbrains.annotations.NotNull; +import org.apache.ignite.internal.table.distributed.TableMessageGroup.Commands; +import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.annotations.Transferable; /** - * Partition transactional command. + * Network Message interface to transfer {@link TablePartitionId} as part of {@link NetworkMessage}. */ -public abstract class PartitionCommand implements WriteCommand { - /** Transaction id. */ - private UUID txId; +@Transferable(Commands.TABLE_PARTITION_ID) +public interface TablePartitionIdMessage extends NetworkMessage, Serializable { + UUID tableId(); - /** - * The constructor. - * - * @param txId Transaction id. - */ - public PartitionCommand(@NotNull UUID txId) { - this.txId = txId; - } + int partitionId(); - /** - * Gets a transaction id. - * - * @return Transaction id. - */ - public UUID txId() { - return txId; + default TablePartitionId asTablePartitionId() { + return new TablePartitionId(tableId(), partitionId()); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java index 9a8af04bd5..f336faaea2 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/TxCleanupCommand.java @@ -17,51 +17,21 @@ package org.apache.ignite.internal.table.distributed.command; -import java.util.UUID; -import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.network.annotations.Transferable; /** * State machine command to cleanup on a transaction commit. */ -public class TxCleanupCommand extends PartitionCommand { - /** - * A commit or a rollback state. - */ - private final boolean commit; - - /** - * Transaction commit timestamp. - */ - private final HybridTimestamp commitTimestamp; - - /** - * The constructor. - * - * @param txId The txId. - * @param commit Commit or rollback state {@code True} to commit. - * @param commitTimestamp Transaction commit timestamp. - */ - public TxCleanupCommand(UUID txId, boolean commit, HybridTimestamp commitTimestamp) { - super(txId); - this.commit = commit; - this.commitTimestamp = commitTimestamp; - } - +@Transferable(TableMessageGroup.Commands.TX_CLEANUP) +public interface TxCleanupCommand extends PartitionCommand { /** * Returns a commit or a rollback state. - * - * @return A commit or a rollback state. */ - public boolean commit() { - return commit; - } + boolean commit(); /** * Returns a transaction commit timestamp. - * - * @return A transaction commit timestamp. */ - public HybridTimestamp commitTimestamp() { - return commitTimestamp; - } + HybridTimestampMessage commitTimestamp(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java index 229cb99705..46e112f05c 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java @@ -17,108 +17,20 @@ package org.apache.ignite.internal.table.distributed.command; -import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.UUID; -import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.storage.RowId; -import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; -import org.apache.ignite.internal.util.CollectionUtils; -import org.jetbrains.annotations.NotNull; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.network.annotations.Marshallable; +import org.apache.ignite.network.annotations.Transferable; +import org.apache.ignite.raft.jraft.util.ByteString; /** * State machine command for updating a batch of entries. */ -public class UpdateAllCommand extends PartitionCommand { - /** Committed table partition id. */ - private final TablePartitionId commitReplicationGroupId; +@Transferable(TableMessageGroup.Commands.UPDATE_ALL) +public interface UpdateAllCommand extends PartitionCommand { + TablePartitionIdMessage tablePartitionId(); - /** Rows to update. */ - private transient HashMap<RowId, BinaryRow> rowsToUpdate; - - /** Bytes representation of a row to update map. */ - private byte[] rowsToUpdateBytes; - - /** - * Constructor for batch remove. - * - * @param commitReplicationGroupId Committed table partition id. - * @param removeRows Ids to remove. - * @param txId Transaction id. - */ - public UpdateAllCommand(@NotNull TablePartitionId commitReplicationGroupId, Collection<RowId> removeRows, @NotNull UUID txId) { - this(commitReplicationGroupId, removeRows, null, txId); - } - - /** - * Constructor for a batch update. - * - * @param commitReplicationGroupId Committed table partition id. - * @param rowsToUpdate Rows to update or insert. - * @param txId Transaction id. - */ - public UpdateAllCommand(@NotNull TablePartitionId commitReplicationGroupId, Map<RowId, BinaryRow> rowsToUpdate, @NotNull UUID txId) { - this(commitReplicationGroupId, null, rowsToUpdate, txId); - } - - /** - * The constructor. - * - * @param commitReplicationGroupId Committed table partition id. - * @param removeRows Ids to remove. - * @param rowsToUpdate Rows to update or insert. - * @param txId Transaction id. - */ - private UpdateAllCommand( - @NotNull TablePartitionId commitReplicationGroupId, - Collection<RowId> removeRows, - Map<RowId, BinaryRow> rowsToUpdate, - @NotNull UUID txId - ) { - super(txId); - - this.commitReplicationGroupId = commitReplicationGroupId; - - int size = (removeRows == null ? 0 : removeRows.size()) + (rowsToUpdate == null ? 0 : rowsToUpdate.size()); - - HashMap<RowId, BinaryRow> rows = new HashMap<>(size); - - if (!CollectionUtils.nullOrEmpty(removeRows)) { - removeRows.forEach(rowId -> rows.put(rowId, null)); - } - - if (!CollectionUtils.nullOrEmpty(rowsToUpdate)) { - rows.putAll(rowsToUpdate); - } - - this.rowsToUpdate = rows; - - rowsToUpdateBytes = CommandUtils.rowMapToBytes(rows); - } - - /** - * Gets a table partition id that the commit partition. - * - * @return Table partition id. - */ - public TablePartitionId getReplicationGroupId() { - return commitReplicationGroupId; - } - - - /** - * Gets rows to update. - * - * @return Rows to update map. - */ - public HashMap<RowId, BinaryRow> getRowsToUpdate() { - if (rowsToUpdate == null) { - rowsToUpdate = new HashMap<>(); - - CommandUtils.readRowMap(rowsToUpdateBytes, rowsToUpdate::put); - } - - return rowsToUpdate; - } + @Marshallable + Map<UUID, ByteString> rowsToUpdate(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java index 719db8e50e..468a5c8f4d 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java @@ -18,92 +18,20 @@ package org.apache.ignite.internal.table.distributed.command; import java.util.UUID; -import org.apache.ignite.internal.schema.BinaryRow; -import org.apache.ignite.internal.schema.ByteBufferRow; -import org.apache.ignite.internal.storage.RowId; -import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.network.annotations.Marshallable; +import org.apache.ignite.network.annotations.Transferable; +import org.apache.ignite.raft.jraft.util.ByteString; /** * State machine command to update a row specified by a row id. */ -public class UpdateCommand extends PartitionCommand { - /** Committed table partition id. */ - private final TablePartitionId commitReplicationGroupId; +@Transferable(TableMessageGroup.Commands.UPDATE) +public interface UpdateCommand extends PartitionCommand { + TablePartitionIdMessage tablePartitionId(); - /** Id of a row that will be updated. */ - private final RowId rowId; + UUID rowUuid(); - /** Binary row. */ - private transient BinaryRow row; - - /** Row bytes. */ - private byte[] rowBytes; - - /** - * Creates a new instance of UpdateCommand with the given row to be updated. The {@code rowId} should not be {@code null}. - * - * @param commitReplicationGroupId Committed table partition id. - * @param rowId Row id. - * @param row Binary row. - * @param txId The transaction id. - * @see PartitionCommand - */ - public UpdateCommand( - @NotNull TablePartitionId commitReplicationGroupId, - @NotNull RowId rowId, - @Nullable BinaryRow row, - @NotNull UUID txId - ) { - super(txId); - - this.commitReplicationGroupId = commitReplicationGroupId; - this.rowId = rowId; - this.row = row; - - this.rowBytes = CommandUtils.rowToBytes(row); - } - - /** - * Constructor for remove operation. - * - * @param commitReplicationGroupId Committed table partition id. - * @param rowId Row id. - * @param txId Transaction id. - */ - public UpdateCommand(@NotNull TablePartitionId commitReplicationGroupId, @NotNull RowId rowId, @NotNull UUID txId) { - this(commitReplicationGroupId, rowId, null, txId); - } - - /** - * Gets a table partition id that the commit partition. - * - * @return Table partition id. - */ - public TablePartitionId getCommitReplicationGroupId() { - return commitReplicationGroupId; - } - - /** - * Gets a row id that will be update. - * - * @return Row id. - */ - public RowId getRowId() { - return rowId; - } - - /** - * Gets a binary row. - * - * @return Binary key. - */ - public BinaryRow getRow() { - if (row == null && rowBytes != null) { - row = new ByteBufferRow(rowBytes); - } - - return row; - } + @Marshallable + ByteString rowBuffer(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index f25117ec10..5bb70b9787 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -34,15 +34,18 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand; import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.RaftGroupConfiguration; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; import org.apache.ignite.internal.table.distributed.command.FinishTxCommand; +import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage; import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; @@ -57,6 +60,7 @@ import org.apache.ignite.raft.client.WriteCommand; import org.apache.ignite.raft.client.service.CommandClosure; import org.apache.ignite.raft.client.service.CommittedConfiguration; import org.apache.ignite.raft.client.service.RaftGroupListener; +import org.apache.ignite.raft.jraft.util.ByteString; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -78,6 +82,9 @@ public class PartitionListener implements RaftGroupListener { private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes; + /** Partition ID. */ + private int partitionId; + /** Rows that were inserted, updated or removed. */ private final HashMap<UUID, Set<RowId>> txsPendingRowIds = new HashMap<>(); @@ -86,17 +93,20 @@ public class PartitionListener implements RaftGroupListener { * * @param partitionDataStorage The storage. * @param txManager Transaction manager. + * @param partitionId Partition ID this listener serves. */ public PartitionListener( PartitionDataStorage partitionDataStorage, TxStateStorage txStateStorage, TxManager txManager, - Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes + Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes, + int partitionId ) { this.storage = partitionDataStorage; this.txStateStorage = txStateStorage; this.txManager = txManager; this.indexes = indexes; + this.partitionId = partitionId; } @Override @@ -174,11 +184,12 @@ public class PartitionListener implements RaftGroupListener { } storage.runConsistently(() -> { - BinaryRow row = cmd.getRow(); - RowId rowId = cmd.getRowId(); + BinaryRow row = cmd.rowBuffer() != null ? new ByteBufferRow(cmd.rowBuffer().toByteArray()) : null; + UUID rowUuid = cmd.rowUuid(); + RowId rowId = new RowId(partitionId, rowUuid); UUID txId = cmd.txId(); - UUID commitTblId = cmd.getCommitReplicationGroupId().getTableId(); - int commitPartId = cmd.getCommitReplicationGroupId().getPartId(); + UUID commitTblId = cmd.tablePartitionId().tableId(); + int commitPartId = cmd.tablePartitionId().partitionId(); storage.addWrite(rowId, row, txId, commitTblId, commitPartId); @@ -207,14 +218,14 @@ public class PartitionListener implements RaftGroupListener { storage.runConsistently(() -> { UUID txId = cmd.txId(); - Map<RowId, BinaryRow> rowsToUpdate = cmd.getRowsToUpdate(); - UUID commitTblId = cmd.getReplicationGroupId().getTableId(); - int commitPartId = cmd.getReplicationGroupId().getPartId(); + Map<UUID, ByteString> rowsToUpdate = cmd.rowsToUpdate(); + UUID commitTblId = cmd.tablePartitionId().tableId(); + int commitPartId = cmd.tablePartitionId().partitionId(); if (!nullOrEmpty(rowsToUpdate)) { - for (Map.Entry<RowId, BinaryRow> entry : rowsToUpdate.entrySet()) { - RowId rowId = entry.getKey(); - BinaryRow row = entry.getValue(); + for (Map.Entry<UUID, ByteString> entry : rowsToUpdate.entrySet()) { + RowId rowId = new RowId(partitionId, entry.getKey()); + BinaryRow row = entry.getValue() != null ? new ByteBufferRow(entry.getValue().toByteArray()) : null; storage.addWrite(rowId, row, txId, commitTblId, commitPartId); @@ -246,12 +257,15 @@ public class PartitionListener implements RaftGroupListener { UUID txId = cmd.txId(); - TxState stateToSet = cmd.commit() ? TxState.COMMITED : TxState.ABORTED; + TxState stateToSet = cmd.commit() ? COMMITED : ABORTED; TxMeta txMetaToSet = new TxMeta( stateToSet, - cmd.replicationGroupIds(), - cmd.commitTimestamp() + cmd.tablePartitionIds() + .stream() + .map(TablePartitionIdMessage::asTablePartitionId) + .collect(Collectors.toList()), + cmd.commitTimestamp() != null ? cmd.commitTimestamp().asHybridTimestamp() : null ); TxMeta txMetaBeforeCas = txStateStorage.get(txId); @@ -305,7 +319,7 @@ public class PartitionListener implements RaftGroupListener { Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, Collections.emptySet()); if (cmd.commit()) { - pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, cmd.commitTimestamp())); + pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, cmd.commitTimestamp().asHybridTimestamp())); } else { pendingRowIds.forEach(storage::abortWrite); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 136f79482f..6d6a9bf306 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -24,11 +24,14 @@ import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import static org.apache.ignite.lang.IgniteStringFormatter.format; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -65,11 +68,15 @@ import org.apache.ignite.internal.storage.index.IndexStorage; import org.apache.ignite.internal.storage.index.SortedIndexStorage; import org.apache.ignite.internal.table.distributed.IndexLocker; import org.apache.ignite.internal.table.distributed.SortedIndexLocker; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; -import org.apache.ignite.internal.table.distributed.command.FinishTxCommand; +import org.apache.ignite.internal.table.distributed.command.FinishTxCommandBuilder; +import org.apache.ignite.internal.table.distributed.command.HybridTimestampMessage; +import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage; import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; +import org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest; @@ -103,11 +110,15 @@ import org.apache.ignite.network.TopologyService; import org.apache.ignite.raft.client.Command; import org.apache.ignite.raft.client.Peer; import org.apache.ignite.raft.client.service.RaftGroupService; +import org.apache.ignite.raft.jraft.util.ByteString; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** Partition replication listener. */ public class PartitionReplicaListener implements ReplicaListener { + /** Factory to create RAFT command messages. */ + private final TableMessagesFactory msgFactory = new TableMessagesFactory(); + /** Tx messages factory. */ private static final TxMessagesFactory FACTORY = new TxMessagesFactory(); @@ -923,7 +934,9 @@ public class PartitionReplicaListener implements ReplicaListener { // TODO: need to properly handle primary replica changes https://issues.apache.org/jira/browse/IGNITE-17615 private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest request) { List<ReplicationGroupId> aggregatedGroupIds = request.groups().values().stream() - .flatMap(List::stream).map(IgniteBiTuple::get1).collect(Collectors.toList()); + .flatMap(List::stream) + .map(IgniteBiTuple::get1) + .collect(Collectors.toList()); UUID txId = request.txId(); @@ -967,14 +980,17 @@ public class PartitionReplicaListener implements ReplicaListener { HybridTimestamp commitTimestamp = commit ? hybridClock.now() : null; - CompletableFuture<Object> changeStateFuture = raftClient.run( - new FinishTxCommand( - txId, - commit, - commitTimestamp, - aggregatedGroupIds - ) - ).whenComplete((o, throwable) -> { + FinishTxCommandBuilder finishTxCmdBldr = msgFactory.finishTxCommand() + .txId(txId) + .commit(commit) + .tablePartitionIds(aggregatedGroupIds.stream() + .map(rgId -> tablePartitionId((TablePartitionId) rgId)).collect(Collectors.toList())); + + if (commit) { + finishTxCmdBldr.commitTimestamp(hybridTimestamp(commitTimestamp)); + } + + CompletableFuture<Object> changeStateFuture = raftClient.run(finishTxCmdBldr.build()).whenComplete((o, throwable) -> { fut.complete(new TxMeta(commit ? TxState.COMMITED : TxState.ABORTED, aggregatedGroupIds, commitTimestamp)); txTimestampUpdateMap.remove(txId); @@ -1004,8 +1020,16 @@ public class PartitionReplicaListener implements ReplicaListener { return failedFuture(e); } + HybridTimestampMessage timestampMsg = hybridTimestamp(request.commitTimestamp()); + + TxCleanupCommand txCleanupCmd = msgFactory.txCleanupCommand() + .txId(request.txId()) + .commit(request.commit()) + .commitTimestamp(timestampMsg) + .build(); + return raftClient - .run(new TxCleanupCommand(request.txId(), request.commit(), request.commitTimestamp())) + .run(txCleanupCmd) .thenRun(() -> lockManager.locks(request.txId()).forEachRemaining(lockManager::release)); } @@ -1154,7 +1178,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return allOf(rowIdLockFuts).thenCompose(ignore -> { - Collection<RowId> rowIdsToDelete = new ArrayList<>(); + Map<UUID, ByteString> rowIdsToDelete = new HashMap<>(); Collection<BinaryRow> result = new ArrayList<>(); int futNum = 0; @@ -1163,7 +1187,7 @@ public class PartitionReplicaListener implements ReplicaListener { RowId lockedRowId = rowIdLockFuts[futNum++].join(); if (lockedRowId != null) { - rowIdsToDelete.add(lockedRowId); + rowIdsToDelete.put(lockedRowId.uuid(), null); } else { result.add(row); } @@ -1173,7 +1197,7 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(result); } - return applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowIdsToDelete, txId)) + return applyCmdWithExceptionHandling(updateAllCommand(committedPartitionId, rowIdsToDelete, txId)) .thenApply(ignored -> result); }); } @@ -1193,7 +1217,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return allOf(deleteExactLockFuts).thenCompose(ignore -> { - Collection<RowId> rowIdsToDelete = new ArrayList<>(); + Map<UUID, ByteString> rowIdsToDelete = new HashMap<>(); Collection<BinaryRow> result = new ArrayList<>(); int futNum = 0; @@ -1202,14 +1226,14 @@ public class PartitionReplicaListener implements ReplicaListener { RowId lockedRowId = deleteExactLockFuts[futNum++].join(); if (lockedRowId != null) { - rowIdsToDelete.add(lockedRowId); + rowIdsToDelete.put(lockedRowId.uuid(), null); } else { result.add(row); } } - CompletableFuture raftFut = rowIdsToDelete.isEmpty() ? completedFuture(null) - : applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowIdsToDelete, txId)); + CompletableFuture<Object> raftFut = rowIdsToDelete.isEmpty() ? completedFuture(null) + : applyCmdWithExceptionHandling(updateAllCommand(committedPartitionId, rowIdsToDelete, txId)); return raftFut.thenApply(ignored -> result); }); @@ -1227,6 +1251,7 @@ public class PartitionReplicaListener implements ReplicaListener { return allOf(pkReadLockFuts).thenCompose(ignore -> { Collection<BinaryRow> result = new ArrayList<>(); Map<RowId, BinaryRow> rowsToInsert = new HashMap<>(); + Set<ByteBuffer> uniqueKeys = new HashSet<>(); int futNum = 0; @@ -1236,7 +1261,8 @@ public class PartitionReplicaListener implements ReplicaListener { if (lockedRow != null) { result.add(row); } else { - if (rowsToInsert.values().stream().noneMatch(row0 -> row0.keySlice().equals(row.keySlice()))) { + ByteBuffer keyToCheck = row.keySlice(); + if (uniqueKeys.add(keyToCheck)) { rowsToInsert.put(new RowId(partId), row); } else { result.add(row); @@ -1256,10 +1282,14 @@ public class PartitionReplicaListener implements ReplicaListener { insertLockFuts[idx++] = takeLocksForInsert(entry.getValue(), entry.getKey(), txId); } + Map<UUID, ByteString> convertedMap = rowsToInsert.entrySet().stream().collect( + Collectors.toMap( + e -> e.getKey().uuid(), + e -> new ByteString(e.getValue().byteBuffer()))); + return allOf(insertLockFuts) .thenCompose(ignored -> applyCmdWithExceptionHandling( - new UpdateAllCommand(committedPartitionId, rowsToInsert, txId))) - .thenApply(ignored -> result); + updateAllCommand(committedPartitionId, convertedMap, txId))).thenApply(ignored -> result); }); } case RW_UPSERT_ALL: { @@ -1280,21 +1310,21 @@ public class PartitionReplicaListener implements ReplicaListener { } return allOf(rowIdFuts).thenCompose(ignore -> { - Map<RowId, BinaryRow> rowsToUpdate = new HashMap<>(); + Map<UUID, ByteString> rowsToUpdate = new HashMap<>(); int futNum = 0; for (BinaryRow row : request.binaryRows()) { RowId lockedRow = rowIdFuts[futNum++].join(); - rowsToUpdate.put(lockedRow, row); + rowsToUpdate.put(lockedRow.uuid(), new ByteString(row.byteBuffer())); } if (rowsToUpdate.isEmpty()) { return completedFuture(null); } - return applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowsToUpdate, txId)) + return applyCmdWithExceptionHandling(updateAllCommand(committedPartitionId, rowsToUpdate, txId)) .thenApply(ignored -> null); }); } @@ -1361,7 +1391,8 @@ public class PartitionReplicaListener implements ReplicaListener { } return takeLocksForDelete(searchRow, rowId, txId) - .thenCompose(ignored -> applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, rowId, txId))) + .thenCompose(ignored -> applyCmdWithExceptionHandling( + updateCommand(commitPartitionId, rowId.uuid(), null, txId))) .thenApply(ignored -> true); }); } @@ -1372,7 +1403,8 @@ public class PartitionReplicaListener implements ReplicaListener { } return takeLocksForDelete(searchRow, rowId, txId) - .thenCompose(ignored -> applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, rowId, txId))) + .thenCompose(ignored -> applyCmdWithExceptionHandling( + updateCommand(commitPartitionId, rowId.uuid(), null, txId))) .thenApply(ignored -> row); }); } @@ -1388,7 +1420,8 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(false); } - return applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, validatedRowId, txId)) + return applyCmdWithExceptionHandling( + updateCommand(commitPartitionId, validatedRowId.uuid(), null, txId)) .thenApply(ignored -> true); }); }); @@ -1403,7 +1436,7 @@ public class PartitionReplicaListener implements ReplicaListener { return takeLocksForInsert(searchRow, rowId0, txId) .thenCompose(ignored -> applyCmdWithExceptionHandling( - new UpdateCommand(commitPartitionId, rowId0, searchRow, txId))) + updateCommand(commitPartitionId, rowId0.uuid(), searchRow.byteBuffer(), txId))) .thenApply(ignored -> true); }); } @@ -1419,7 +1452,7 @@ public class PartitionReplicaListener implements ReplicaListener { return lockFut .thenCompose(ignored -> applyCmdWithExceptionHandling( - new UpdateCommand(commitPartitionId, rowId0, searchRow, txId))) + updateCommand(commitPartitionId, rowId0.uuid(), searchRow.byteBuffer(), txId))) .thenApply(ignored -> null); }); } @@ -1435,7 +1468,7 @@ public class PartitionReplicaListener implements ReplicaListener { return lockFut .thenCompose(ignored -> applyCmdWithExceptionHandling( - new UpdateCommand(commitPartitionId, rowId0, searchRow, txId))) + updateCommand(commitPartitionId, rowId0.uuid(), searchRow.byteBuffer(), txId))) .thenApply(ignored -> row); }); } @@ -1447,7 +1480,7 @@ public class PartitionReplicaListener implements ReplicaListener { return takeLocksForUpdate(searchRow, rowId, txId) .thenCompose(ignored -> applyCmdWithExceptionHandling( - new UpdateCommand(commitPartitionId, rowId, searchRow, txId))) + updateCommand(commitPartitionId, rowId.uuid(), searchRow.byteBuffer(), txId))) .thenApply(ignored0 -> row); }); } @@ -1459,7 +1492,7 @@ public class PartitionReplicaListener implements ReplicaListener { return takeLocksForUpdate(searchRow, rowId, txId) .thenCompose(ignored -> applyCmdWithExceptionHandling( - new UpdateCommand(commitPartitionId, rowId, searchRow, txId))) + updateCommand(commitPartitionId, rowId.uuid(), searchRow.byteBuffer(), txId))) .thenApply(ignored -> true); }); } @@ -1602,7 +1635,8 @@ public class PartitionReplicaListener implements ReplicaListener { return completedFuture(false); } - return applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, validatedRowId, newRow, txId)) + return applyCmdWithExceptionHandling( + updateCommand(commitPartitionId, validatedRowId.uuid(), newRow.byteBuffer(), txId)) .thenApply(ignored -> true); }); }); @@ -1789,4 +1823,80 @@ public class PartitionReplicaListener implements ReplicaListener { } }); } + + /** + * Compounds a RAFT group unique name. + * + * @param tblId Table identifier. + * @param partition Number of table partitions. + * @return A RAFT group name. + */ + private String partitionRaftGroupName(UUID tblId, int partition) { + return tblId + "_part_" + partition; + } + + /** + * Method to convert from {@link HybridTimestamp} object to NetworkMessage-based {@link HybridTimestampMessage} object. + * + * @param tmstmp {@link HybridTimestamp} object to convert to {@link HybridTimestampMessage}. + * @return {@link HybridTimestampMessage} object obtained from {@link HybridTimestamp}. + */ + private HybridTimestampMessage hybridTimestamp(HybridTimestamp tmstmp) { + return tmstmp != null ? msgFactory.hybridTimestampMessage() + .physical(tmstmp.getPhysical()) + .logical(tmstmp.getLogical()) + .build() + : null; + } + + /** + * Method to construct {@link UpdateCommand} object. + * + * @param tablePartId {@link TablePartitionId} object to construct {@link UpdateCommand} object with. + * @param rowUuid Row UUID. + * @param rowBuf {@link ByteBuffer} representation of {@link BinaryRow}. + * @param txId Transaction ID. + * @return Constructed {@link UpdateCommand} object. + */ + private UpdateCommand updateCommand(TablePartitionId tablePartId, UUID rowUuid, ByteBuffer rowBuf, UUID txId) { + UpdateCommandBuilder bldr = msgFactory.updateCommand() + .tablePartitionId(tablePartitionId(tablePartId)) + .rowUuid(rowUuid) + .txId(txId); + + if (rowBuf != null) { + bldr.rowBuffer(new ByteString(rowBuf)); + } + + return bldr.build(); + } + + /** + * Method to construct {@link UpdateAllCommand} object. + * + * @param tablePartId {@link TablePartitionId} object to construct {@link UpdateCommand} object with. + * @param rowsToUpdate All {@link BinaryRow}s represented as {@link ByteString}s to be updated. + * @param txId Transaction ID. + * @return Constructed {@link UpdateAllCommand} object. + */ + private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, Map<UUID, ByteString> rowsToUpdate, UUID txId) { + return msgFactory.updateAllCommand() + .tablePartitionId(tablePartitionId(tablePartId)) + .rowsToUpdate(rowsToUpdate) + .txId(txId) + .build(); + } + + /** + * Method to convert from {@link TablePartitionId} object to command-based {@link TablePartitionIdMessage} object. + * + * @param tablePartId {@link TablePartitionId} object to convert to {@link TablePartitionIdMessage}. + * @return {@link TablePartitionIdMessage} object converted from argument. + */ + private TablePartitionIdMessage tablePartitionId(TablePartitionId tablePartId) { + return msgFactory.tablePartitionIdMessage() + .tableId(tablePartId.getTableId()) + .partitionId(tablePartId.getPartId()) + .build(); + } } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java index 8d818f7ac6..eb6f5cb9b2 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java @@ -35,18 +35,18 @@ import java.util.Objects; import java.util.UUID; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; -import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.marshaller.KvMarshaller; import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory; -import org.apache.ignite.internal.storage.RowId; -import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tostring.IgniteToStringInclude; import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.tx.Timestamp; +import org.apache.ignite.raft.jraft.util.ByteString; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -57,6 +57,9 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { /** Key-value marshaller for tests. */ protected static KvMarshaller<TestKey, TestValue> kvMarshaller; + /** Message factory to create messages - RAFT commands. */ + private TableMessagesFactory msgFactory = new TableMessagesFactory(); + @BeforeAll static void beforeAll() { var marshallerFactory = new ReflectionMarshallerFactory(); @@ -74,72 +77,100 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { @Test public void testUpdateCommand() throws Exception { - UpdateCommand cmd = new UpdateCommand( - new TablePartitionId(UUID.randomUUID(), 1), - new RowId(1), - binaryRow(1), - UUID.randomUUID() - ); + UpdateCommand cmd = msgFactory.updateCommand() + .tablePartitionId(msgFactory.tablePartitionIdMessage() + .tableId(UUID.randomUUID()) + .partitionId(1) + .build() + ) + .rowUuid(Timestamp.nextVersion().toUuid()) + .rowBuffer(byteStrFromBinaryRow(1)) + .txId(UUID.randomUUID()) + .build(); UpdateCommand readCmd = copyCommand(cmd); assertEquals(cmd.txId(), readCmd.txId()); - assertEquals(cmd.getRowId(), readCmd.getRowId()); - assertArrayEquals(cmd.getRow().bytes(), readCmd.getRow().bytes()); + assertEquals(cmd.rowUuid(), readCmd.rowUuid()); + assertArrayEquals(cmd.rowBuffer().toByteArray(), readCmd.rowBuffer().toByteArray()); } @Test public void testRemoveCommand() throws Exception { - UpdateCommand cmd = new UpdateCommand(new TablePartitionId(UUID.randomUUID(), 1), new RowId(1), UUID.randomUUID()); + UpdateCommand cmd = msgFactory.updateCommand() + .tablePartitionId(msgFactory.tablePartitionIdMessage() + .tableId(UUID.randomUUID()) + .partitionId(1) + .build() + ) + .rowUuid(Timestamp.nextVersion().toUuid()) + .txId(UUID.randomUUID()) + .build(); UpdateCommand readCmd = copyCommand(cmd); assertEquals(cmd.txId(), readCmd.txId()); - assertEquals(cmd.getRowId(), readCmd.getRowId()); - assertNull(readCmd.getRow()); + assertEquals(cmd.rowUuid(), readCmd.rowUuid()); + assertNull(readCmd.rowBuffer()); } @Test public void testUpdateAllCommand() throws Exception { - HashMap<RowId, BinaryRow> rowsToUpdate = new HashMap(); + HashMap<UUID, ByteString> rowsToUpdate = new HashMap(); for (int i = 0; i < 10; i++) { - rowsToUpdate.put(new RowId(i), binaryRow(i)); + rowsToUpdate.put(Timestamp.nextVersion().toUuid(), byteStrFromBinaryRow(i)); } - var cmd = new UpdateAllCommand(new TablePartitionId(UUID.randomUUID(), 1), rowsToUpdate, UUID.randomUUID()); + var cmd = msgFactory.updateAllCommand() + .tablePartitionId(msgFactory.tablePartitionIdMessage() + .tableId(UUID.randomUUID()) + .partitionId(1) + .build() + ) + .rowsToUpdate(rowsToUpdate) + .txId(UUID.randomUUID()) + .build(); UpdateAllCommand readCmd = copyCommand(cmd); assertEquals(cmd.txId(), readCmd.txId()); - for (Map.Entry<RowId, BinaryRow> entry : cmd.getRowsToUpdate().entrySet()) { - assertTrue(readCmd.getRowsToUpdate().containsKey(entry.getKey())); + for (Map.Entry<UUID, ByteString> entry : cmd.rowsToUpdate().entrySet()) { + assertTrue(readCmd.rowsToUpdate().containsKey(entry.getKey())); - var readVal = readCmd.getRowsToUpdate().get(entry.getKey()); + var readVal = readCmd.rowsToUpdate().get(entry.getKey()); var val = entry.getValue(); - assertArrayEquals(val.bytes(), readVal.bytes()); + assertArrayEquals(val.toByteArray(), readVal.toByteArray()); } } @Test public void testRemoveAllCommand() throws Exception { - ArrayList<RowId> rowsToRemove = new ArrayList<>(); + Map<UUID, ByteString> rowsToRemove = new HashMap<>(); for (int i = 0; i < 10; i++) { - rowsToRemove.add(new RowId(i)); + rowsToRemove.put(Timestamp.nextVersion().toUuid(), null); } - var cmd = new UpdateAllCommand(new TablePartitionId(UUID.randomUUID(), 1), rowsToRemove, UUID.randomUUID()); + var cmd = msgFactory.updateAllCommand() + .tablePartitionId(msgFactory.tablePartitionIdMessage() + .tableId(UUID.randomUUID()) + .partitionId(1) + .build() + ) + .rowsToUpdate(rowsToRemove) + .txId(UUID.randomUUID()) + .build(); UpdateAllCommand readCmd = copyCommand(cmd); assertEquals(cmd.txId(), readCmd.txId()); - for (RowId rowId : cmd.getRowsToUpdate().keySet()) { - assertTrue(readCmd.getRowsToUpdate().containsKey(rowId)); - assertNull(readCmd.getRowsToUpdate().get(rowId)); + for (UUID uuid : cmd.rowsToUpdate().keySet()) { + assertTrue(readCmd.rowsToUpdate().containsKey(uuid)); + assertNull(readCmd.rowsToUpdate().get(uuid)); } } @@ -147,7 +178,11 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { public void testTxCleanupCommand() throws Exception { HybridClock clock = new HybridClockImpl(); - TxCleanupCommand cmd = new TxCleanupCommand(UUID.randomUUID(), true, clock.now()); + TxCleanupCommand cmd = msgFactory.txCleanupCommand() + .txId(UUID.randomUUID()) + .commit(true) + .commitTimestamp(hybridTimestampMessage(clock.now())) + .build(); TxCleanupCommand readCmd = copyCommand(cmd); @@ -159,20 +194,35 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { @Test public void testFinishTxCommand() throws Exception { HybridClock clock = new HybridClockImpl(); - ArrayList<ReplicationGroupId> grps = new ArrayList<>(10); + ArrayList<TablePartitionIdMessage> grps = new ArrayList<>(10); for (int i = 0; i < 10; i++) { - grps.add(new TablePartitionId(UUID.randomUUID(), i)); + grps.add(msgFactory.tablePartitionIdMessage() + .tableId(UUID.randomUUID()) + .partitionId(i) + .build()); } - FinishTxCommand cmd = new FinishTxCommand(UUID.randomUUID(), true, clock.now(), grps); + FinishTxCommand cmd = msgFactory.finishTxCommand() + .txId(UUID.randomUUID()) + .commit(true) + .commitTimestamp(hybridTimestampMessage(clock.now())) + .tablePartitionIds(grps) + .build(); FinishTxCommand readCmd = copyCommand(cmd); assertEquals(cmd.txId(), readCmd.txId()); assertEquals(cmd.commit(), readCmd.commit()); assertEquals(cmd.commitTimestamp(), readCmd.commitTimestamp()); - assertEquals(cmd.replicationGroupIds(), readCmd.replicationGroupIds()); + assertEquals(cmd.tablePartitionIds(), readCmd.tablePartitionIds()); + } + + private HybridTimestampMessage hybridTimestampMessage(HybridTimestamp tmstmp) { + return msgFactory.hybridTimestampMessage() + .logical(tmstmp.getLogical()) + .physical(tmstmp.getPhysical()) + .build(); } private <T> T copyCommand(T cmd) throws Exception { @@ -199,8 +249,9 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { } } - private static BinaryRow binaryRow(int id) throws Exception { - return kvMarshaller.marshal(new TestKey(id, String.valueOf(id)), new TestValue(id, String.valueOf(id))); + private static ByteString byteStrFromBinaryRow(int id) throws Exception { + return new ByteString(kvMarshaller.marshal(new TestKey(id, String.valueOf(id)), new TestValue(id, String.valueOf(id))) + .byteBuffer()); } /** diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index 8a56351b87..885f8df449 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -40,11 +40,9 @@ import java.io.Serializable; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; @@ -72,10 +70,10 @@ import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage; import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; import org.apache.ignite.internal.table.distributed.command.FinishTxCommand; import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; -import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; import org.apache.ignite.internal.testframework.WorkDirectory; @@ -94,6 +92,7 @@ import org.apache.ignite.raft.client.Command; import org.apache.ignite.raft.client.WriteCommand; import org.apache.ignite.raft.client.service.CommandClosure; import org.apache.ignite.raft.client.service.CommittedConfiguration; +import org.apache.ignite.raft.jraft.util.ByteString; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -154,6 +153,9 @@ public class PartitionCommandListenerTest { @WorkDirectory private Path workDir; + /** Factory for command messages. */ + private TableMessagesFactory msgFactory = new TableMessagesFactory(); + @Captor private ArgumentCaptor<Throwable> commandClosureResultCaptor; @@ -174,7 +176,8 @@ public class PartitionCommandListenerTest { partitionDataStorage, txStateStorage, new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl()), - () -> Map.of(pkStorage.id(), pkStorage) + () -> Map.of(pkStorage.id(), pkStorage), + PARTITION_ID ); } @@ -262,7 +265,8 @@ public class PartitionCommandListenerTest { partitionDataStorage, txStateStorage, new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl()), - () -> Map.of(pkStorage.id(), pkStorage) + () -> Map.of(pkStorage.id(), pkStorage), + PARTITION_ID ); txStateStorage.lastApplied(3L, 1L); @@ -486,18 +490,35 @@ public class PartitionCommandListenerTest { * Inserts all rows. */ private void insertAll() { - HashMap<RowId, BinaryRow> rows = new HashMap<>(KEY_COUNT); + HashMap<UUID, ByteString> rows = new HashMap<>(KEY_COUNT); UUID txId = Timestamp.nextVersion().toUuid(); var commitPartId = new TablePartitionId(txId, PARTITION_ID); for (int i = 0; i < KEY_COUNT; i++) { Row row = getTestRow(i, i); - rows.put(new RowId(PARTITION_ID), row); + rows.put(Timestamp.nextVersion().toUuid(), new ByteString(row.byteBuffer())); } - invokeBatchedCommand(new UpdateAllCommand(commitPartId, rows, txId)); - invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now())); + HybridTimestamp commitTimestamp = CLOCK.now(); + + invokeBatchedCommand(msgFactory.updateAllCommand() + .tablePartitionId( + msgFactory.tablePartitionIdMessage() + .tableId(commitPartId.getTableId()) + .partitionId(commitPartId.getPartId()) + .build()) + .rowsToUpdate(rows) + .txId(txId) + .build()); + invokeBatchedCommand(msgFactory.txCleanupCommand() + .txId(txId) + .commit(true) + .commitTimestamp(msgFactory.hybridTimestampMessage() + .physical(commitTimestamp.getPhysical()) + .logical(commitTimestamp.getLogical()) + .build()) + .build()); } /** @@ -508,16 +529,33 @@ public class PartitionCommandListenerTest { private void updateAll(Function<Integer, Integer> keyValueMapper) { UUID txId = Timestamp.nextVersion().toUuid(); var commitPartId = new TablePartitionId(txId, PARTITION_ID); - HashMap<RowId, BinaryRow> rows = new HashMap<>(KEY_COUNT); + HashMap<UUID, ByteString> rows = new HashMap<>(KEY_COUNT); for (int i = 0; i < KEY_COUNT; i++) { Row row = getTestRow(i, keyValueMapper.apply(i)); - rows.put(readRow(row), row); + rows.put(readRow(row).uuid(), new ByteString(row.byteBuffer())); } - invokeBatchedCommand(new UpdateAllCommand(commitPartId, rows, txId)); - invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now())); + HybridTimestamp commitTimestamp = CLOCK.now(); + + invokeBatchedCommand(msgFactory.updateAllCommand() + .tablePartitionId( + msgFactory.tablePartitionIdMessage() + .tableId(commitPartId.getTableId()) + .partitionId(commitPartId.getPartId()) + .build()) + .rowsToUpdate(rows) + .txId(txId) + .build()); + invokeBatchedCommand(msgFactory.txCleanupCommand() + .txId(txId) + .commit(true) + .commitTimestamp(msgFactory.hybridTimestampMessage() + .physical(commitTimestamp.getPhysical()) + .logical(commitTimestamp.getLogical()) + .build()) + .build()); } /** @@ -526,16 +564,33 @@ public class PartitionCommandListenerTest { private void deleteAll() { UUID txId = Timestamp.nextVersion().toUuid(); var commitPartId = new TablePartitionId(txId, PARTITION_ID); - Set<RowId> keyRows = new HashSet<>(KEY_COUNT); + Map<UUID, ByteString> keyRows = new HashMap<>(KEY_COUNT); for (int i = 0; i < KEY_COUNT; i++) { Row row = getTestRow(i, i); - keyRows.add(readRow(row)); + keyRows.put(readRow(row).uuid(), null); } - invokeBatchedCommand(new UpdateAllCommand(commitPartId, keyRows, txId)); - invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now())); + HybridTimestamp commitTimestamp = CLOCK.now(); + + invokeBatchedCommand(msgFactory.updateAllCommand() + .tablePartitionId( + msgFactory.tablePartitionIdMessage() + .tableId(commitPartId.getTableId()) + .partitionId(commitPartId.getPartId()) + .build()) + .rowsToUpdate(keyRows) + .txId(txId) + .build()); + invokeBatchedCommand(msgFactory.txCleanupCommand() + .txId(txId) + .commit(true) + .commitTimestamp(msgFactory.hybridTimestampMessage() + .physical(commitTimestamp.getPhysical()) + .logical(commitTimestamp.getLogical()) + .build()) + .build()); } /** @@ -550,7 +605,6 @@ public class PartitionCommandListenerTest { UUID txId = Timestamp.nextVersion().toUuid(); Row row = getTestRow(i, keyValueMapper.apply(i)); RowId rowId = readRow(row); - var commitPartId = new TablePartitionId(txId, PARTITION_ID); assertNotNull(rowId); @@ -558,7 +612,15 @@ public class PartitionCommandListenerTest { when(clo.index()).thenReturn(raftIndex.incrementAndGet()); - when(clo.command()).thenReturn(new UpdateCommand(commitPartId, rowId, row, txId)); + when(clo.command()).thenReturn( + msgFactory.updateCommand() + .tablePartitionId(msgFactory.tablePartitionIdMessage() + .tableId(txId) + .partitionId(PARTITION_ID).build()) + .rowUuid(new UUID(rowId.mostSignificantBits(), rowId.leastSignificantBits())) + .rowBuffer(new ByteString(row.byteBuffer())) + .txId(txId) + .build()); doAnswer(invocation -> { assertNull(invocation.getArgument(0)); @@ -567,7 +629,16 @@ public class PartitionCommandListenerTest { }).when(clo).result(any()); })); - txIds.forEach(txId -> invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now()))); + HybridTimestamp commitTimestamp = CLOCK.now(); + + txIds.forEach(txId -> invokeBatchedCommand(msgFactory.txCleanupCommand() + .txId(txId) + .commit(true) + .commitTimestamp(msgFactory.hybridTimestampMessage() + .physical(commitTimestamp.getPhysical()) + .logical(commitTimestamp.getLogical()) + .build()) + .build())); } /** @@ -580,7 +651,6 @@ public class PartitionCommandListenerTest { UUID txId = Timestamp.nextVersion().toUuid(); Row row = getTestRow(i, i); RowId rowId = readRow(row); - var commitPartId = new TablePartitionId(txId, PARTITION_ID); assertNotNull(rowId); @@ -588,7 +658,14 @@ public class PartitionCommandListenerTest { when(clo.index()).thenReturn(raftIndex.incrementAndGet()); - when(clo.command()).thenReturn(new UpdateCommand(commitPartId, rowId, txId)); + when(clo.command()).thenReturn( + msgFactory.updateCommand() + .tablePartitionId(msgFactory.tablePartitionIdMessage() + .tableId(txId) + .partitionId(PARTITION_ID).build()) + .rowUuid(new UUID(rowId.mostSignificantBits(), rowId.leastSignificantBits())) + .txId(txId) + .build()); doAnswer(invocation -> { assertNull(invocation.getArgument(0)); @@ -597,7 +674,16 @@ public class PartitionCommandListenerTest { }).when(clo).result(any()); })); - txIds.forEach(txId -> invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now()))); + HybridTimestamp commitTimestamp = CLOCK.now(); + + txIds.forEach(txId -> invokeBatchedCommand(msgFactory.txCleanupCommand() + .txId(txId) + .commit(true) + .commitTimestamp(msgFactory.hybridTimestampMessage() + .physical(commitTimestamp.getPhysical()) + .logical(commitTimestamp.getLogical()) + .build()) + .build())); } /** @@ -648,7 +734,15 @@ public class PartitionCommandListenerTest { when(clo.index()).thenReturn(raftIndex.incrementAndGet()); - when(clo.command()).thenReturn(new UpdateCommand(commitPartId, new RowId(PARTITION_ID), row, txId)); + when(clo.command()).thenReturn( + msgFactory.updateCommand() + .tablePartitionId(msgFactory.tablePartitionIdMessage() + .tableId(txId) + .partitionId(PARTITION_ID).build()) + .rowUuid(Timestamp.nextVersion().toUuid()) + .rowBuffer(new ByteString(row.byteBuffer())) + .txId(txId) + .build()); doAnswer(invocation -> { assertNull(invocation.getArgument(0)); @@ -657,7 +751,19 @@ public class PartitionCommandListenerTest { }).when(clo).result(any()); })); - txIds.forEach(txId -> invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now()))); + HybridTimestamp now = CLOCK.now(); + + + + txIds.forEach(txId -> invokeBatchedCommand( + msgFactory.txCleanupCommand() + .txId(txId) + .commit(true) + .commitTimestamp(msgFactory.hybridTimestampMessage() + .physical(now.getPhysical()) + .logical(now.getLogical()) + .build()) + .build())); } /** diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index f25de52421..7e2e6bbc18 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -261,7 +261,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { new TestPartitionDataStorage(mvPartStorage), txStateStorage().getOrCreateTxStateStorage(0), this.txManager, - () -> Map.of(pkStorage.get().id(), pkStorage.get()) + () -> Map.of(pkStorage.get().id(), pkStorage.get()), + 0 ); }