This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 75e7464477 IGNITE-17871 JDKMarshaller replaced with OptimizedMarshaller for RAFT commands. (#1411) 75e7464477 is described below commit 75e7464477a26b4cd34af9e040c44687018689cf Author: Ivan Bessonov <bessonov...@gmail.com> AuthorDate: Mon Dec 12 12:46:30 2022 +0300 IGNITE-17871 JDKMarshaller replaced with OptimizedMarshaller for RAFT commands. (#1411) --- .../raft/commands/InitCmgStateCommand.java | 3 +- .../management/raft/commands/JoinReadyCommand.java | 3 +- .../raft/commands/JoinRequestCommand.java | 3 +- .../raft/commands/NodesLeaveCommand.java | 3 +- .../raft/commands/ReadLogicalTopologyCommand.java | 3 +- .../management/raft/commands/ReadStateCommand.java | 3 +- .../metastorage/common/command/GetAllCommand.java | 3 +- .../common/command/GetAndPutAllCommand.java | 3 +- .../common/command/GetAndPutCommand.java | 3 +- .../common/command/GetAndRemoveAllCommand.java | 3 +- .../common/command/GetAndRemoveCommand.java | 3 +- .../metastorage/common/command/GetCommand.java | 3 +- .../metastorage/common/command/InvokeCommand.java | 3 +- .../common/command/MultiInvokeCommand.java | 3 +- .../metastorage/common/command/PutAllCommand.java | 3 +- .../metastorage/common/command/PutCommand.java | 3 +- .../metastorage/common/command/RangeCommand.java | 3 +- .../common/command/RemoveAllCommand.java | 3 +- .../metastorage/common/command/RemoveCommand.java | 3 +- .../common/command/WatchExactKeysCommand.java | 3 +- .../common/command/WatchRangeKeysCommand.java | 3 +- .../common/command/cursor/CursorCloseCommand.java | 3 +- .../command/cursor/CursorHasNextCommand.java | 3 +- .../common/command/cursor/CursorNextCommand.java | 3 +- .../common/command/cursor/CursorsCloseCommand.java | 3 +- .../org/apache/ignite/internal/raft/Command.java | 4 +- .../raft/server/ItJraftCounterServerTest.java | 38 +++++---- .../apache/ignite/raft/server/ItSafeTimeTest.java | 4 +- .../raft/server/ItSimpleCounterServerTest.java | 20 ++--- .../raft/server/snasphot/TestWriteCommand.java | 26 ------ .../server/snasphot/UpdateCountRaftListener.java | 1 + .../internal/raft/server/impl/JraftServerImpl.java | 18 ++++- .../jraft/rpc/impl/ActionRequestProcessor.java | 10 ++- .../raft/jraft/rpc/impl/IgniteRpcServer.java | 4 +- .../ignite/internal/raft/RaftGroupServiceTest.java | 21 +++-- .../org/apache/ignite/raft/TestWriteCommand.java | 8 +- .../ignite/raft/messages/TestMessageGroup.java | 7 ++ .../raft/server/counter/GetValueCommand.java | 10 ++- .../server/counter/IncrementAndGetCommand.java | 27 +++---- .../replicator/command/SafeTimeSyncCommand.java | 5 +- .../replicator/message/ReplicaMessageGroup.java | 22 +++--- .../ignite/internal/table/ItColocationTest.java | 14 ++-- .../table/distributed/TableMessageGroup.java | 9 ++- .../distributed/command/PartitionCommand.java | 3 +- .../distributed/command/UpdateAllCommand.java | 6 +- .../table/distributed/command/UpdateCommand.java | 6 +- .../table/distributed/raft/PartitionListener.java | 10 +-- .../replicator/PartitionReplicaListener.java | 26 +++--- .../PartitionRaftCommandsSerializationTest.java | 92 +++++++++++++--------- .../raft/PartitionCommandListenerTest.java | 31 +++++--- .../incoming/IncomingSnapshotCopierTest.java | 2 +- 51 files changed, 254 insertions(+), 245 deletions(-) diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/InitCmgStateCommand.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/InitCmgStateCommand.java index 3840140b87..e0c7d86a11 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/InitCmgStateCommand.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/InitCmgStateCommand.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.cluster.management.raft.commands; import org.apache.ignite.internal.cluster.management.ClusterState; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** @@ -28,7 +27,7 @@ import org.apache.ignite.network.annotations.Transferable; * existing state. */ @Transferable(CmgMessageGroup.Commands.INIT_CMG_STATE) -public interface InitCmgStateCommand extends WriteCommand, NetworkMessage { +public interface InitCmgStateCommand extends WriteCommand { /** * Returns the node that wants to initialize the CMG state. */ diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinReadyCommand.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinReadyCommand.java index 68acd07717..3de5185f76 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinReadyCommand.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinReadyCommand.java @@ -19,14 +19,13 @@ package org.apache.ignite.internal.cluster.management.raft.commands; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Command sent by a node when it's ready to join the cluster and enter the logical topology. */ @Transferable(CmgMessageGroup.Commands.JOIN_READY) -public interface JoinReadyCommand extends WriteCommand, NetworkMessage { +public interface JoinReadyCommand extends WriteCommand { /** * Returns the node that wants to enter the logical topology. */ diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinRequestCommand.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinRequestCommand.java index a2a8be1a19..7b6731014e 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinRequestCommand.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/JoinRequestCommand.java @@ -21,14 +21,13 @@ import org.apache.ignite.internal.cluster.management.ClusterTag; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup; import org.apache.ignite.internal.properties.IgniteProductVersion; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Command sent by a node that intends to join a cluster. This command will trigger node validation. */ @Transferable(CmgMessageGroup.Commands.JOIN_REQUEST) -public interface JoinRequestCommand extends WriteCommand, NetworkMessage { +public interface JoinRequestCommand extends WriteCommand { /** * Returns the node that wants to join a cluster. */ diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java index e84f72bf6e..a5ebdb06d1 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/NodesLeaveCommand.java @@ -20,14 +20,13 @@ package org.apache.ignite.internal.cluster.management.raft.commands; import java.util.Set; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Command that gets executed when nodes need to be removed from the logical topology. */ @Transferable(CmgMessageGroup.Commands.NODES_LEAVE) -public interface NodesLeaveCommand extends WriteCommand, NetworkMessage { +public interface NodesLeaveCommand extends WriteCommand { /** * Returns the nodes that need to be removed from the logical topology. */ diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadLogicalTopologyCommand.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadLogicalTopologyCommand.java index 9d6851300f..161fb70d43 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadLogicalTopologyCommand.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadLogicalTopologyCommand.java @@ -19,12 +19,11 @@ package org.apache.ignite.internal.cluster.management.raft.commands; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup; import org.apache.ignite.internal.raft.ReadCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Command for retrieving the current logical topology. */ @Transferable(CmgMessageGroup.Commands.READ_LOGICAL_TOPOLOGY) -public interface ReadLogicalTopologyCommand extends ReadCommand, NetworkMessage { +public interface ReadLogicalTopologyCommand extends ReadCommand { } diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadStateCommand.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadStateCommand.java index d4f250c5c2..5a02e92b8d 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadStateCommand.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/commands/ReadStateCommand.java @@ -20,12 +20,11 @@ package org.apache.ignite.internal.cluster.management.raft.commands; import org.apache.ignite.internal.cluster.management.ClusterState; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessageGroup; import org.apache.ignite.internal.raft.ReadCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Command for retrieving the current {@link ClusterState}. */ @Transferable(CmgMessageGroup.Commands.READ_STATE) -public interface ReadStateCommand extends ReadCommand, NetworkMessage { +public interface ReadStateCommand extends ReadCommand { } diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java index d4351d4d1e..90eef8256c 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java @@ -22,14 +22,13 @@ import java.util.List; import java.util.Set; import org.apache.ignite.internal.raft.ReadCommand; import org.apache.ignite.lang.ByteArray; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Get all command for MetaStorageCommandListener that retrieves entries for given keys and the revision upper bound, if latter is present. */ @Transferable(MetastorageCommandsMessageGroup.GET_ALL) -public interface GetAllCommand extends ReadCommand, NetworkMessage { +public interface GetAllCommand extends ReadCommand { /** * Returns the list of keys. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java index 71960b841a..07cd8ebabf 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.lang.ByteArray; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** @@ -30,7 +29,7 @@ import org.apache.ignite.network.annotations.Transferable; * previous entries for given keys. */ @Transferable(MetastorageCommandsMessageGroup.GET_AND_PUT_ALL) -public interface GetAndPutAllCommand extends WriteCommand, NetworkMessage { +public interface GetAndPutAllCommand extends WriteCommand { /** * Returns keys. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java index 612d833262..f246785763 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.metastorage.common.command; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** @@ -26,7 +25,7 @@ import org.apache.ignite.network.annotations.Transferable; * a previous entry for the given key. */ @Transferable(MetastorageCommandsMessageGroup.GET_AND_PUT) -public interface GetAndPutCommand extends WriteCommand, NetworkMessage { +public interface GetAndPutCommand extends WriteCommand { /** * Returns the key. Couldn't be {@code null}. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java index 11f65fe74b..d331779748 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java @@ -22,14 +22,13 @@ import java.util.List; import java.util.Set; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.lang.ByteArray; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Get and remove all command for MetaStorageCommandListener that removes entries for given keys and retrieves previous entries. */ @Transferable(MetastorageCommandsMessageGroup.GET_AND_REMOVE_ALL) -public interface GetAndRemoveAllCommand extends NetworkMessage, WriteCommand { +public interface GetAndRemoveAllCommand extends WriteCommand { /** * Returns the keys collection. Couldn't be {@code null}. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java index 02124ced93..44a2ca4f02 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.metastorage.common.command; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** @@ -26,7 +25,7 @@ import org.apache.ignite.network.annotations.Transferable; * given key. */ @Transferable(MetastorageCommandsMessageGroup.GET_AND_REMOVE) -public interface GetAndRemoveCommand extends WriteCommand, NetworkMessage { +public interface GetAndRemoveCommand extends WriteCommand { /** * Returns the key. Couldn't be {@code null}. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java index 8b7113ecb3..74fc5ffa05 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java @@ -18,14 +18,13 @@ package org.apache.ignite.internal.metastorage.common.command; import org.apache.ignite.internal.raft.ReadCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Get command for MetaStorageCommandListener that retrieves an entry for the given key and the revision upper bound, if latter is present. */ @Transferable(MetastorageCommandsMessageGroup.GET) -public interface GetCommand extends ReadCommand, NetworkMessage { +public interface GetCommand extends ReadCommand { /** * Returns key. Couldn't be {@code null}. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java index 06dc51b4af..ed0ee32186 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java @@ -20,14 +20,13 @@ package org.apache.ignite.internal.metastorage.common.command; import java.util.List; import org.apache.ignite.internal.metastorage.common.OperationInfo; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Represents invoke command for meta storage. */ @Transferable(MetastorageCommandsMessageGroup.INVOKE) -public interface InvokeCommand extends NetworkMessage, WriteCommand { +public interface InvokeCommand extends WriteCommand { /** * Returns condition. * diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/MultiInvokeCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/MultiInvokeCommand.java index 6376694688..7e62fdf2db 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/MultiInvokeCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/MultiInvokeCommand.java @@ -18,14 +18,13 @@ package org.apache.ignite.internal.metastorage.common.command; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Represents invoke command with nested conditions and execution branches. */ @Transferable(MetastorageCommandsMessageGroup.MULTI_INVOKE) -public interface MultiInvokeCommand extends NetworkMessage, WriteCommand { +public interface MultiInvokeCommand extends WriteCommand { /** * Returns if statement. * diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java index 6639ffbb77..1604b85167 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java @@ -22,14 +22,13 @@ import java.util.List; import java.util.Map; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.lang.ByteArray; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Put all command for MetaStorageCommandListener that inserts or updates entries with given keys and given values. */ @Transferable(MetastorageCommandsMessageGroup.PUT_ALL) -public interface PutAllCommand extends WriteCommand, NetworkMessage { +public interface PutAllCommand extends WriteCommand { /** * Returns entries keys. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java index 3fb838a214..a349844d9d 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.metastorage.common.command; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** @@ -26,7 +25,7 @@ import org.apache.ignite.network.annotations.Transferable; * previous entry for the given key. */ @Transferable(MetastorageCommandsMessageGroup.PUT) -public interface PutCommand extends WriteCommand, NetworkMessage { +public interface PutCommand extends WriteCommand { /** * Returns the key. Couldn't be {@code null}. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java index 8993e620a4..a750163982 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.metastorage.common.command; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** @@ -27,7 +26,7 @@ import org.apache.ignite.network.annotations.Transferable; * filtered out by upper bound of given revision number. */ @Transferable(MetastorageCommandsMessageGroup.RANGE) -public interface RangeCommand extends WriteCommand, NetworkMessage { +public interface RangeCommand extends WriteCommand { /** Default value for {@link #batchSize}. */ int DEFAULT_BATCH_SIZE = 100; diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java index 15ef2fae21..df6f5c0027 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java @@ -22,14 +22,13 @@ import java.util.List; import java.util.Set; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.lang.ByteArray; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Remove all command for MetaStorageCommandListener that removes entries for given keys. */ @Transferable(MetastorageCommandsMessageGroup.REMOVE_ALL) -public interface RemoveAllCommand extends WriteCommand, NetworkMessage { +public interface RemoveAllCommand extends WriteCommand { /** * Returns the keys list. Couldn't be {@code null}. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java index 0ec5f56a33..0697aa6929 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java @@ -18,14 +18,13 @@ package org.apache.ignite.internal.metastorage.common.command; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Remove command for MetaStorageCommandListener that removes an entry for the given key. */ @Transferable(MetastorageCommandsMessageGroup.REMOVE) -public interface RemoveCommand extends WriteCommand, NetworkMessage { +public interface RemoveCommand extends WriteCommand { /** * Returns the key. Couldn't be {@code null}. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java index 1cf5fc6290..cfca1fb686 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java @@ -23,14 +23,13 @@ import java.util.Set; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.lang.ByteArray; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Watch command for MetaStorageCommandListener that subscribes on meta storage updates matching the parameters. */ @Transferable(MetastorageCommandsMessageGroup.WATCH_EXACT_KEYS) -public interface WatchExactKeysCommand extends NetworkMessage, WriteCommand { +public interface WatchExactKeysCommand extends WriteCommand { /** * Returns the keys list. Couldn't be {@code null}. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java index 013745e066..269ba13d41 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.metastorage.common.command; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; import org.jetbrains.annotations.Nullable; @@ -27,7 +26,7 @@ import org.jetbrains.annotations.Nullable; * Watch command for MetaStorageCommandListener that subscribes on meta storage updates matching the parameters. */ @Transferable(MetastorageCommandsMessageGroup.WATCH_RANGE_KEYS) -public interface WatchRangeKeysCommand extends NetworkMessage, WriteCommand { +public interface WatchRangeKeysCommand extends WriteCommand { /** * Returns start key of range (inclusive). Couldn't be {@code null}. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorCloseCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorCloseCommand.java index edde63db1e..17adffeae7 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorCloseCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorCloseCommand.java @@ -20,14 +20,13 @@ package org.apache.ignite.internal.metastorage.common.command.cursor; import org.apache.ignite.internal.metastorage.common.command.MetastorageCommandsMessageGroup; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Cursor close command for MetaStorageCommandListener that closes cursor with given id. */ @Transferable(MetastorageCommandsMessageGroup.CURSOR_CLOSE) -public interface CursorCloseCommand extends NetworkMessage, WriteCommand { +public interface CursorCloseCommand extends WriteCommand { /** * Returns cursor id. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorHasNextCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorHasNextCommand.java index be1ec84098..7304792b6e 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorHasNextCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorHasNextCommand.java @@ -20,14 +20,13 @@ package org.apache.ignite.internal.metastorage.common.command.cursor; import org.apache.ignite.internal.metastorage.common.command.MetastorageCommandsMessageGroup; import org.apache.ignite.internal.raft.ReadCommand; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Cursor {@code hasNext} command for MetaStorageCommandListener that checks whether next element is available. */ @Transferable(MetastorageCommandsMessageGroup.CURSOR_HAS_NEXT) -public interface CursorHasNextCommand extends NetworkMessage, ReadCommand { +public interface CursorHasNextCommand extends ReadCommand { /** * Returns cursor id. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorNextCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorNextCommand.java index a5c2049937..7fb92db4f0 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorNextCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorNextCommand.java @@ -20,14 +20,13 @@ package org.apache.ignite.internal.metastorage.common.command.cursor; import org.apache.ignite.internal.metastorage.common.command.MetastorageCommandsMessageGroup; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** * Cursor {@code next} command for MetaStorageCommandListener that returns next element and moves cursor. */ @Transferable(MetastorageCommandsMessageGroup.CURSOR_NEXT) -public interface CursorNextCommand extends NetworkMessage, WriteCommand { +public interface CursorNextCommand extends WriteCommand { /** * Returns cursor id. */ diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorsCloseCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorsCloseCommand.java index 2abe0d8671..5c7132df6e 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorsCloseCommand.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorsCloseCommand.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.metastorage.common.command.cursor; import org.apache.ignite.internal.metastorage.common.command.MetastorageCommandsMessageGroup; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; /** @@ -27,7 +26,7 @@ import org.apache.ignite.network.annotations.Transferable; * topology. */ @Transferable(MetastorageCommandsMessageGroup.CURSORS_CLOSE) -public interface CursorsCloseCommand extends NetworkMessage, WriteCommand { +public interface CursorsCloseCommand extends WriteCommand { /** * Returns cursor id. */ diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Command.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Command.java index dd0de21e48..570b32732f 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Command.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/Command.java @@ -17,10 +17,10 @@ package org.apache.ignite.internal.raft; -import java.io.Serializable; +import org.apache.ignite.network.NetworkMessage; /** * A marker interface for replication group command. */ -public interface Command extends Serializable { +public interface Command extends NetworkMessage { } diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java index ac356b9cd2..6a1b912558 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java @@ -25,6 +25,8 @@ import static org.apache.ignite.raft.jraft.core.State.STATE_ERROR; import static org.apache.ignite.raft.jraft.core.State.STATE_LEADER; import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition; import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology; +import static org.apache.ignite.raft.server.counter.GetValueCommand.getValueCommand; +import static org.apache.ignite.raft.server.counter.IncrementAndGetCommand.incrementAndGetCommand; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -63,11 +65,11 @@ import org.apache.ignite.raft.jraft.option.NodeOptions; import org.apache.ignite.raft.jraft.rpc.impl.RaftException; import org.apache.ignite.raft.jraft.test.TestUtils; import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper; +import org.apache.ignite.raft.messages.TestRaftMessagesFactory; import org.apache.ignite.raft.server.counter.CounterListener; import org.apache.ignite.raft.server.counter.GetValueCommand; import org.apache.ignite.raft.server.counter.IncrementAndGetCommand; import org.apache.ignite.raft.server.snasphot.SnapshotInMemoryStorageFactory; -import org.apache.ignite.raft.server.snasphot.TestWriteCommand; import org.apache.ignite.raft.server.snasphot.UpdateCountRaftListener; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; @@ -199,15 +201,15 @@ class ItJraftCounterServerTest extends JraftAbstractTest { assertNotNull(client1.leader()); assertNotNull(client2.leader()); - assertEquals(2, client1.<Long>run(new IncrementAndGetCommand(2)).get()); - assertEquals(2, client1.<Long>run(new GetValueCommand()).get()); - assertEquals(3, client1.<Long>run(new IncrementAndGetCommand(1)).get()); - assertEquals(3, client1.<Long>run(new GetValueCommand()).get()); + assertEquals(2, client1.<Long>run(incrementAndGetCommand(2)).get()); + assertEquals(2, client1.<Long>run(getValueCommand()).get()); + assertEquals(3, client1.<Long>run(incrementAndGetCommand(1)).get()); + assertEquals(3, client1.<Long>run(getValueCommand()).get()); - assertEquals(4, client2.<Long>run(new IncrementAndGetCommand(4)).get()); - assertEquals(4, client2.<Long>run(new GetValueCommand()).get()); - assertEquals(7, client2.<Long>run(new IncrementAndGetCommand(3)).get()); - assertEquals(7, client2.<Long>run(new GetValueCommand()).get()); + assertEquals(4, client2.<Long>run(incrementAndGetCommand(4)).get()); + assertEquals(4, client2.<Long>run(getValueCommand()).get()); + assertEquals(7, client2.<Long>run(incrementAndGetCommand(3)).get()); + assertEquals(7, client2.<Long>run(getValueCommand()).get()); } @Test @@ -369,7 +371,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest { assertEquals(sum(9), val3); try { - client1.<Long>run(new IncrementAndGetCommand(10)).get(); + client1.<Long>run(incrementAndGetCommand(10)).get(); fail(); } catch (Exception e) { @@ -384,7 +386,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest { // Client can't switch to new leader, because only one peer in the list. try { - client1.<Long>run(new IncrementAndGetCommand(11)).get(); + client1.<Long>run(incrementAndGetCommand(11)).get(); } catch (Exception e) { boolean isValid = e.getCause() instanceof TimeoutException; @@ -437,7 +439,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest { assertNotNull(leader); try { - client1.<Long>run(new IncrementAndGetCommand(3)).get(); + client1.<Long>run(incrementAndGetCommand(3)).get(); fail(); } catch (Exception e) { @@ -450,7 +452,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest { } try { - client1.<Long>run(new GetValueCommand()).get(); + client1.<Long>run(getValueCommand()).get(); fail(); } catch (Exception e) { @@ -573,19 +575,21 @@ class ItJraftCounterServerTest extends JraftAbstractTest { raftClient.refreshMembers(true).get(); var peers = raftClient.peers(); - raftClient.run(new TestWriteCommand()); + var testWriteCommandBuilder = new TestRaftMessagesFactory().testWriteCommand(); + + raftClient.run(testWriteCommandBuilder.build()); assertTrue(TestUtils.waitForCondition(() -> counters.get(0).get() == 1, 10_000)); raftClient.snapshot(peers.get(0)).get(); - raftClient.run(new TestWriteCommand()); + raftClient.run(testWriteCommandBuilder.build()); assertTrue(TestUtils.waitForCondition(() -> counters.get(1).get() == 2, 10_000)); raftClient.snapshot(peers.get(1)).get(); - raftClient.run(new TestWriteCommand()); + raftClient.run(testWriteCommandBuilder.build()); for (AtomicInteger counter : counters.values()) { assertTrue(TestUtils.waitForCondition(() -> counter.get() == 3, 10_000)); @@ -767,7 +771,7 @@ class ItJraftCounterServerTest extends JraftAbstractTest { long val = 0; for (int i = start; i <= stop; i++) { - val = client.<Long>run(new IncrementAndGetCommand(i)).get(); + val = client.<Long>run(incrementAndGetCommand(i)).get(); logger().info("Val={}, i={}", val, i); } diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java index c98c6b5e76..fad2f441e5 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSafeTimeTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.raft.server; import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults; import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition; +import static org.apache.ignite.raft.server.counter.IncrementAndGetCommand.incrementAndGetCommand; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; @@ -33,7 +34,6 @@ import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.raft.server.counter.CounterListener; -import org.apache.ignite.raft.server.counter.IncrementAndGetCommand; import org.junit.jupiter.api.Test; /** @@ -102,7 +102,7 @@ public class ItSafeTimeTest extends JraftAbstractTest { clocks.get(leaderIndex).update(new HybridTimestamp(leaderPhysicalTime, 0)); - client1.run(new IncrementAndGetCommand(1)).get(); + client1.run(incrementAndGetCommand(1)).get(); waitForCondition(() -> { for (int i = 0; i < NODES; i++) { diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java index f2b3967160..e8ac6b4a1d 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java @@ -19,6 +19,8 @@ package org.apache.ignite.raft.server; import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults; import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology; +import static org.apache.ignite.raft.server.counter.GetValueCommand.getValueCommand; +import static org.apache.ignite.raft.server.counter.IncrementAndGetCommand.incrementAndGetCommand; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -40,8 +42,6 @@ import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.server.counter.CounterListener; -import org.apache.ignite.raft.server.counter.GetValueCommand; -import org.apache.ignite.raft.server.counter.IncrementAndGetCommand; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -162,14 +162,14 @@ class ItSimpleCounterServerTest extends RaftServerAbstractTest { assertNotNull(client1.leader()); assertNotNull(client2.leader()); - assertEquals(2, client1.<Long>run(new IncrementAndGetCommand(2)).get()); - assertEquals(2, client1.<Long>run(new GetValueCommand()).get()); - assertEquals(3, client1.<Long>run(new IncrementAndGetCommand(1)).get()); - assertEquals(3, client1.<Long>run(new GetValueCommand()).get()); + assertEquals(2, client1.<Long>run(incrementAndGetCommand(2)).get()); + assertEquals(2, client1.<Long>run(getValueCommand()).get()); + assertEquals(3, client1.<Long>run(incrementAndGetCommand(1)).get()); + assertEquals(3, client1.<Long>run(getValueCommand()).get()); - assertEquals(4, client2.<Long>run(new IncrementAndGetCommand(4)).get()); - assertEquals(4, client2.<Long>run(new GetValueCommand()).get()); - assertEquals(7, client2.<Long>run(new IncrementAndGetCommand(3)).get()); - assertEquals(7, client2.<Long>run(new GetValueCommand()).get()); + assertEquals(4, client2.<Long>run(incrementAndGetCommand(4)).get()); + assertEquals(4, client2.<Long>run(getValueCommand()).get()); + assertEquals(7, client2.<Long>run(incrementAndGetCommand(3)).get()); + assertEquals(7, client2.<Long>run(getValueCommand()).get()); } } diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/TestWriteCommand.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/TestWriteCommand.java deleted file mode 100644 index b154b64cb3..0000000000 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/TestWriteCommand.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.raft.server.snasphot; - -import org.apache.ignite.internal.raft.WriteCommand; - -/** - * Test write command. - */ -public class TestWriteCommand implements WriteCommand { -} diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java index ceec8b5fa0..d81b6d9575 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/snasphot/UpdateCountRaftListener.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.raft.ReadCommand; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupListener; +import org.apache.ignite.raft.TestWriteCommand; /** * The RAFT state machine counts applied write commands and stores the result into {@link java.util.concurrent.atomic.AtomicLong} that is diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index 6df4e505a4..6e49bae9bc 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.storage.LogStorageFactory; import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory; import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory; +import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.lang.IgniteInternalException; @@ -76,7 +77,7 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader; import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter; import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper; import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy; -import org.apache.ignite.raft.jraft.util.JDKMarshaller; +import org.apache.ignite.raft.jraft.util.Marshaller; import org.apache.ignite.raft.jraft.util.Utils; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -113,6 +114,9 @@ public class JraftServerImpl implements RaftServer { /** Request executor. */ private ExecutorService requestExecutor; + /** Marshaller for RAFT commands. */ + private final Marshaller commandsMarshaller; + /** The number of parallel raft groups starts. */ private static final int SIMULTANEOUS_GROUP_START_PARALLELISM = Math.min(Utils.cpus() * 3, 25); @@ -168,6 +172,8 @@ public class JraftServerImpl implements RaftServer { } startGroupInProgressMonitors = Collections.unmodifiableList(monitors); + + commandsMarshaller = new ThreadLocalOptimizedMarshaller(service.localConfiguration().getSerializationRegistry()); } /** {@inheritDoc} */ @@ -387,7 +393,7 @@ public class JraftServerImpl implements RaftServer { nodeOptions.setSnapshotUri(serverDataPath.resolve("snapshot").toString()); - nodeOptions.setFsm(new DelegatingStateMachine(lsnr)); + nodeOptions.setFsm(new DelegatingStateMachine(lsnr, commandsMarshaller)); nodeOptions.setRaftGrpEvtsLsnr(new RaftGroupEventsListenerAdapter(evLsnr)); @@ -519,13 +525,17 @@ public class JraftServerImpl implements RaftServer { public static class DelegatingStateMachine extends StateMachineAdapter { private final RaftGroupListener listener; + private final Marshaller marshaller; + /** * Constructor. * * @param listener The listener. + * @param marshaller Marshaller. */ - DelegatingStateMachine(RaftGroupListener listener) { + DelegatingStateMachine(RaftGroupListener listener, Marshaller marshaller) { this.listener = listener; + this.marshaller = marshaller; } public RaftGroupListener getListener() { @@ -547,7 +557,7 @@ public class JraftServerImpl implements RaftServer { @Nullable CommandClosure<WriteCommand> done = (CommandClosure<WriteCommand>) iter.done(); ByteBuffer data = iter.getData(); - WriteCommand command = done == null ? JDKMarshaller.DEFAULT.unmarshall(data.array()) : done.command(); + WriteCommand command = done == null ? marshaller.unmarshall(data.array()) : done.command(); long commandIndex = iter.getIndex(); long commandTerm = iter.getTerm(); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java index 0d0a29ec5c..b966f6e275 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/ActionRequestProcessor.java @@ -42,7 +42,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcContext; import org.apache.ignite.raft.jraft.rpc.RpcProcessor; import org.apache.ignite.raft.jraft.rpc.RpcRequests; import org.apache.ignite.raft.jraft.util.BytesUtil; -import org.apache.ignite.raft.jraft.util.JDKMarshaller; +import org.apache.ignite.raft.jraft.util.Marshaller; /** * Process action request. @@ -54,9 +54,12 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> { private final RaftMessagesFactory factory; - public ActionRequestProcessor(Executor executor, RaftMessagesFactory factory) { + private final Marshaller commandsMarshaller; + + public ActionRequestProcessor(Executor executor, RaftMessagesFactory factory, Marshaller commandsMarshaller) { this.executor = executor; this.factory = factory; + this.commandsMarshaller = commandsMarshaller; } /** {@inheritDoc} */ @@ -83,8 +86,7 @@ public class ActionRequestProcessor implements RpcProcessor<ActionRequest> { * @param rpcCtx The context. */ private void applyWrite(Node node, ActionRequest request, RpcContext rpcCtx) { - // TODO asch get rid of JDK marshaller IGNITE-14832 - node.apply(new Task(ByteBuffer.wrap(JDKMarshaller.DEFAULT.marshall(request.command())), + node.apply(new Task(ByteBuffer.wrap(commandsMarshaller.marshall(request.command())), new CommandClosureImpl<>(request.command()) { @Override public void result(Serializable res) { diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java index 3326f533c5..64bf8eb84c 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.tostring.S; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; @@ -114,7 +115,8 @@ public class IgniteRpcServer implements RpcServer<Void> { registerProcessor(new RemoveLearnersRequestProcessor(rpcExecutor, raftMessagesFactory)); registerProcessor(new ResetLearnersRequestProcessor(rpcExecutor, raftMessagesFactory)); // common client integration - registerProcessor(new ActionRequestProcessor(rpcExecutor, raftMessagesFactory)); + var commandsMarshaller = new ThreadLocalOptimizedMarshaller(service.localConfiguration().getSerializationRegistry()); + registerProcessor(new ActionRequestProcessor(rpcExecutor, raftMessagesFactory, commandsMarshaller)); var messageHandler = new RpcMessageHandler(); diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java index d89c7072cb..4862af2381 100644 --- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java @@ -23,6 +23,7 @@ import static java.util.stream.Collectors.toUnmodifiableList; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.raft.TestWriteCommand.testWriteCommand; import static org.apache.ignite.raft.jraft.test.TestUtils.peersToIds; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -58,6 +59,7 @@ import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.network.TopologyService; +import org.apache.ignite.raft.TestWriteCommand; import org.apache.ignite.raft.jraft.RaftMessagesFactory; import org.apache.ignite.raft.jraft.Status; import org.apache.ignite.raft.jraft.entity.PeerId; @@ -204,7 +206,7 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { assertThat(service.refreshLeader(), willCompleteSuccessfully()); - assertThat(service.run(new TestCommand()), willBe(instanceOf(TestResponse.class))); + assertThat(service.run(testWriteCommand()), willBe(instanceOf(TestResponse.class))); } @Test @@ -216,7 +218,7 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { assertNull(service.leader()); - assertThat(service.run(new TestCommand()), willBe(instanceOf(TestResponse.class))); + assertThat(service.run(testWriteCommand()), willBe(instanceOf(TestResponse.class))); assertEquals(leader, service.leader()); } @@ -228,7 +230,7 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { RaftGroupService service = startRaftGroupService(NODES, false); - assertThat(service.run(new TestCommand()), willThrow(TimeoutException.class, 500, TimeUnit.MILLISECONDS)); + assertThat(service.run(testWriteCommand()), willThrow(TimeoutException.class, 500, TimeUnit.MILLISECONDS)); } @Test @@ -246,7 +248,7 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { assertEquals(leader, service.leader()); - assertThat(service.run(new TestCommand()), willThrow(TimeoutException.class)); + assertThat(service.run(testWriteCommand()), willThrow(TimeoutException.class)); } @Test @@ -266,7 +268,7 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { executor.schedule((Runnable) () -> this.leader = NODES.get(0), 500, TimeUnit.MILLISECONDS); - assertThat(service.run(new TestCommand()), willBe(instanceOf(TestResponse.class))); + assertThat(service.run(testWriteCommand()), willBe(instanceOf(TestResponse.class))); assertEquals(NODES.get(0), service.leader()); } @@ -295,7 +297,7 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { 500, TimeUnit.MILLISECONDS ); - assertThat(service.run(new TestCommand()), willBe(instanceOf(TestResponse.class))); + assertThat(service.run(testWriteCommand()), willBe(instanceOf(TestResponse.class))); assertEquals(NODES.get(1), service.leader()); } @@ -319,7 +321,7 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { assertNotEquals(leader, newLeader); // Runs the command on an old leader. It should respond with leader changed error, when transparently retry. - assertThat(service.run(new TestCommand()), willBe(instanceOf(TestResponse.class))); + assertThat(service.run(testWriteCommand()), willBe(instanceOf(TestResponse.class))); assertEquals(newLeader, service.leader()); } @@ -607,7 +609,7 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { argThat(new ArgumentMatcher<ActionRequest>() { @Override public boolean matches(ActionRequest arg) { - return arg.command() instanceof TestCommand; + return arg.command() instanceof TestWriteCommand; } }), anyLong() @@ -701,9 +703,6 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { } - private static class TestCommand implements WriteCommand { - } - private static class TestResponse { } diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/TestWriteCommand.java b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/TestWriteCommand.java index 92a364da1a..502f86bbf5 100644 --- a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/TestWriteCommand.java +++ b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/TestWriteCommand.java @@ -18,13 +18,17 @@ package org.apache.ignite.raft; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.annotations.Transferable; +import org.apache.ignite.raft.messages.TestRaftMessagesFactory; /** * Test WriteCommand. */ @Transferable(10) -public interface TestWriteCommand extends NetworkMessage, WriteCommand { +public interface TestWriteCommand extends WriteCommand { String value(); + + static TestWriteCommand testWriteCommand() { + return new TestRaftMessagesFactory().testWriteCommand().build(); + } } diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/messages/TestMessageGroup.java b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/messages/TestMessageGroup.java index 6dbca8cdef..c8530ab39f 100644 --- a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/messages/TestMessageGroup.java +++ b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/messages/TestMessageGroup.java @@ -18,10 +18,17 @@ package org.apache.ignite.raft.messages; import org.apache.ignite.network.annotations.MessageGroup; +import org.apache.ignite.raft.server.counter.GetValueCommand; +import org.apache.ignite.raft.server.counter.IncrementAndGetCommand; /** * Message group for tests. */ @MessageGroup(groupType = 4, groupName = "TestRaftMessages") public interface TestMessageGroup { + /** Message type for {@link GetValueCommand}. */ + short GET_VALUE_COMMAND = 1000; + + /** Message type for {@link IncrementAndGetCommand}. */ + short INCREMENT_AND_GET_COMMAND = 1001; } diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/GetValueCommand.java b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/server/counter/GetValueCommand.java similarity index 64% rename from modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/GetValueCommand.java rename to modules/raft/src/testFixtures/java/org/apache/ignite/raft/server/counter/GetValueCommand.java index 028b5bf46d..5d319f096d 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/GetValueCommand.java +++ b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/server/counter/GetValueCommand.java @@ -18,9 +18,17 @@ package org.apache.ignite.raft.server.counter; import org.apache.ignite.internal.raft.ReadCommand; +import org.apache.ignite.network.annotations.Transferable; +import org.apache.ignite.raft.messages.TestMessageGroup; +import org.apache.ignite.raft.messages.TestRaftMessagesFactory; /** * Get a value command. */ -public class GetValueCommand implements ReadCommand { +//TODO IGNITE-18357 Move to integration test directory when Maven build is not supported anymore. +@Transferable(TestMessageGroup.GET_VALUE_COMMAND) +public interface GetValueCommand extends ReadCommand { + static GetValueCommand getValueCommand() { + return new TestRaftMessagesFactory().getValueCommand().build(); + } } diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java similarity index 63% rename from modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java rename to modules/raft/src/testFixtures/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java index ebc6bb9434..cdc6c46fba 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java +++ b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/server/counter/IncrementAndGetCommand.java @@ -18,29 +18,22 @@ package org.apache.ignite.raft.server.counter; import org.apache.ignite.internal.raft.WriteCommand; +import org.apache.ignite.network.annotations.Transferable; +import org.apache.ignite.raft.messages.TestMessageGroup; +import org.apache.ignite.raft.messages.TestRaftMessagesFactory; /** * Increment and get command. */ -public class IncrementAndGetCommand implements WriteCommand { - /** - * The delta. - */ - private final long delta; - - /** - * Constructor. - * - * @param delta The delta. - */ - public IncrementAndGetCommand(long delta) { - this.delta = delta; - } - +//TODO IGNITE-18357 Move to integration test directory when Maven build is not supported anymore. +@Transferable(TestMessageGroup.INCREMENT_AND_GET_COMMAND) +public interface IncrementAndGetCommand extends WriteCommand { /** * Returns the delta. */ - public long delta() { - return delta; + long delta(); + + static IncrementAndGetCommand incrementAndGetCommand(long delta) { + return new TestRaftMessagesFactory().incrementAndGetCommand().delta(delta).build(); } } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java index dc9df4e5a4..86549c26e7 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/command/SafeTimeSyncCommand.java @@ -18,9 +18,12 @@ package org.apache.ignite.internal.replicator.command; import org.apache.ignite.internal.raft.WriteCommand; +import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup; +import org.apache.ignite.network.annotations.Transferable; /** * Write command to synchronize safe time periodically. */ -public class SafeTimeSyncCommand implements WriteCommand { +@Transferable(ReplicaMessageGroup.SAFE_TIME_SYNC_COMMAND) +public interface SafeTimeSyncCommand extends WriteCommand { } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java index 62e4aa1ddb..d01089d5f1 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaMessageGroup.java @@ -17,34 +17,38 @@ package org.apache.ignite.internal.replicator.message; +import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand; import org.apache.ignite.network.annotations.MessageGroup; /** * Message group for the replication process. */ @MessageGroup(groupType = 8, groupName = "ReplicaMessages") -public class ReplicaMessageGroup { +public interface ReplicaMessageGroup { /** Message type for {@link ErrorReplicaResponse}. */ - public static final short ERROR_REPLICA_RESPONSE = 1; + short ERROR_REPLICA_RESPONSE = 1; /** Message type for {@link ReplicaResponse}. */ - public static final short REPLICA_RESPONSE = 2; + short REPLICA_RESPONSE = 2; /** Message type for {@link TimestampAware}. */ - public static final short TIMESTAMP_AWARE = 3; + short TIMESTAMP_AWARE = 3; /** Message type for {@link ErrorTimestampAwareReplicaResponse}. */ - public static final short ERROR_TIMESTAMP_AWARE_REPLICA_RESPONSE = 4; + short ERROR_TIMESTAMP_AWARE_REPLICA_RESPONSE = 4; /** Message type for {@link TimestampAwareReplicaResponse}. */ - public static final short TIMESTAMP_AWARE_REPLICA_RESPONSE = 5; + short TIMESTAMP_AWARE_REPLICA_RESPONSE = 5; /** Message type for {@link ReplicaSafeTimeSyncRequest}. */ - public static final short SAFE_TIME_SYNC_REQUEST = 6; + short SAFE_TIME_SYNC_REQUEST = 6; /** Message type for {@link AwaitReplicaRequest}. */ - public static final short AWAIT_REPLICA_REQUEST = 7; + short AWAIT_REPLICA_REQUEST = 7; /** Message type for {@link AwaitReplicaResponse}. */ - public static final short AWAIT_REPLICA_RESPONSE = 8; + short AWAIT_REPLICA_RESPONSE = 8; + + /** Message type for {@link SafeTimeSyncCommand}. */ + short SAFE_TIME_SYNC_COMMAND = 40; } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index 4da5b96154..629018f065 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -35,6 +35,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -60,6 +61,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.message.ReplicaRequest; +import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.schema.Column; @@ -92,7 +94,6 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NetworkAddress; -import org.apache.ignite.raft.jraft.util.ByteString; import org.apache.ignite.table.Tuple; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; @@ -190,10 +191,9 @@ public class ItColocationTest { RaftGroupService r = groupRafts.get(request.groupId()); if (request instanceof ReadWriteMultiRowReplicaRequest) { - Map<UUID, ByteString> rows = ((ReadWriteMultiRowReplicaRequest) request).binaryRows() + Map<UUID, ByteBuffer> rows = ((ReadWriteMultiRowReplicaRequest) request).binaryRows() .stream() - .collect(toMap( - row -> Timestamp.nextVersion().toUuid(), row -> new ByteString(row.byteBuffer()))); + .collect(toMap(row -> Timestamp.nextVersion().toUuid(), BinaryRow::byteBuffer)); return r.run(MSG_FACTORY.updateAllCommand() .tablePartitionId(MSG_FACTORY.tablePartitionIdMessage() @@ -215,7 +215,7 @@ public class ItColocationTest { .build() ) .rowUuid(Timestamp.nextVersion().toUuid()) - .rowBuffer(new ByteString(((ReadWriteSingleRowReplicaRequest) request).binaryRow().byteBuffer())) + .rowBuffer(((ReadWriteSingleRowReplicaRequest) request).binaryRow().byteBuffer()) .txId(UUID.randomUUID()) .build()); } @@ -388,8 +388,8 @@ public class ItColocationTest { UpdateAllCommand cmd = (UpdateAllCommand) CollectionUtils.first(set); assertEquals(partsMap.get(p), cmd.rowsToUpdate().size(), () -> "part=" + p + ", set=" + set); - cmd.rowsToUpdate().values().forEach(byteStr -> { - Row r = new Row(schema, new ByteBufferRow(byteStr.toByteArray())); + cmd.rowsToUpdate().values().forEach(byteBuffer -> { + Row r = new Row(schema, new ByteBufferRow(byteBuffer)); assertEquals(INT_TABLE.partition(r), p); }); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java index 2a4f6f96c8..7955b34108 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.table.distributed; +import static org.apache.ignite.internal.table.distributed.TableMessageGroup.GROUP_TYPE; + import org.apache.ignite.internal.table.distributed.command.FinishTxCommand; import org.apache.ignite.internal.table.distributed.command.HybridTimestampMessage; import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage; @@ -45,8 +47,11 @@ import org.apache.ignite.network.annotations.MessageGroup; /** * Message group for the table module. */ -@MessageGroup(groupType = 9, groupName = "TableMessages") +@MessageGroup(groupType = GROUP_TYPE, groupName = "TableMessages") public interface TableMessageGroup { + /** Table message group type. */ + short GROUP_TYPE = 9; + /** * Message type for {@link ReadWriteSingleRowReplicaRequest}. */ @@ -135,7 +140,7 @@ public interface TableMessageGroup { /** * Message types for Table module RAFT commands. */ - public interface Commands { + interface Commands { /** Message type for {@link FinishTxCommand}. */ short FINISH_TX = 40; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java index 40ea915476..33c7547567 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/PartitionCommand.java @@ -19,12 +19,11 @@ package org.apache.ignite.internal.table.distributed.command; import java.util.UUID; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.network.NetworkMessage; /** * Partition transactional command. */ -public interface PartitionCommand extends WriteCommand, NetworkMessage { +public interface PartitionCommand extends WriteCommand { /** * Returns a transaction id. */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java index 46e112f05c..9654deff74 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java @@ -17,12 +17,11 @@ package org.apache.ignite.internal.table.distributed.command; +import java.nio.ByteBuffer; import java.util.Map; import java.util.UUID; import org.apache.ignite.internal.table.distributed.TableMessageGroup; -import org.apache.ignite.network.annotations.Marshallable; import org.apache.ignite.network.annotations.Transferable; -import org.apache.ignite.raft.jraft.util.ByteString; /** * State machine command for updating a batch of entries. @@ -31,6 +30,5 @@ import org.apache.ignite.raft.jraft.util.ByteString; public interface UpdateAllCommand extends PartitionCommand { TablePartitionIdMessage tablePartitionId(); - @Marshallable - Map<UUID, ByteString> rowsToUpdate(); + Map<UUID, ByteBuffer> rowsToUpdate(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java index 468a5c8f4d..ab9746eaac 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java @@ -17,11 +17,10 @@ package org.apache.ignite.internal.table.distributed.command; +import java.nio.ByteBuffer; import java.util.UUID; import org.apache.ignite.internal.table.distributed.TableMessageGroup; -import org.apache.ignite.network.annotations.Marshallable; import org.apache.ignite.network.annotations.Transferable; -import org.apache.ignite.raft.jraft.util.ByteString; /** * State machine command to update a row specified by a row id. @@ -32,6 +31,5 @@ public interface UpdateCommand extends PartitionCommand { UUID rowUuid(); - @Marshallable - ByteString rowBuffer(); + ByteBuffer rowBuffer(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 1439cd9424..47d872a56e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -23,6 +23,7 @@ import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_UNEXPECTED_STATE_ERR; import static org.apache.ignite.lang.IgniteStringFormatter.format; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; @@ -60,7 +61,6 @@ import org.apache.ignite.internal.tx.TxMeta; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.lang.IgniteInternalException; -import org.apache.ignite.raft.jraft.util.ByteString; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -184,7 +184,7 @@ public class PartitionListener implements RaftGroupListener { } storage.runConsistently(() -> { - BinaryRow row = cmd.rowBuffer() != null ? new ByteBufferRow(cmd.rowBuffer().toByteArray()) : null; + BinaryRow row = cmd.rowBuffer() != null ? new ByteBufferRow(cmd.rowBuffer()) : null; UUID rowUuid = cmd.rowUuid(); RowId rowId = new RowId(partitionId, rowUuid); UUID txId = cmd.txId(); @@ -218,14 +218,14 @@ public class PartitionListener implements RaftGroupListener { storage.runConsistently(() -> { UUID txId = cmd.txId(); - Map<UUID, ByteString> rowsToUpdate = cmd.rowsToUpdate(); + Map<UUID, ByteBuffer> rowsToUpdate = cmd.rowsToUpdate(); UUID commitTblId = cmd.tablePartitionId().tableId(); int commitPartId = cmd.tablePartitionId().partitionId(); if (!nullOrEmpty(rowsToUpdate)) { - for (Map.Entry<UUID, ByteString> entry : rowsToUpdate.entrySet()) { + for (Map.Entry<UUID, ByteBuffer> entry : rowsToUpdate.entrySet()) { RowId rowId = new RowId(partitionId, entry.getKey()); - BinaryRow row = entry.getValue() != null ? new ByteBufferRow(entry.getValue().toByteArray()) : null; + BinaryRow row = entry.getValue() != null ? new ByteBufferRow(entry.getValue()) : null; storage.addWrite(rowId, row, txId, commitTblId, commitPartId); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 480bc8d36a..00a836e667 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -52,12 +52,12 @@ import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand; import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; import org.apache.ignite.internal.replicator.exception.ReplicationException; import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException; import org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException; import org.apache.ignite.internal.replicator.listener.ReplicaListener; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest; import org.apache.ignite.internal.schema.BinaryRow; @@ -111,7 +111,6 @@ import org.apache.ignite.lang.ErrorGroups.Replicator; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.raft.jraft.util.ByteString; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -120,6 +119,9 @@ public class PartitionReplicaListener implements ReplicaListener { /** Factory to create RAFT command messages. */ private final TableMessagesFactory msgFactory = new TableMessagesFactory(); + /** Factory for creating replica command messages. */ + private final ReplicaMessagesFactory replicaMessagesFactory = new ReplicaMessagesFactory(); + /** Tx messages factory. */ private static final TxMessagesFactory FACTORY = new TxMessagesFactory(); @@ -505,7 +507,7 @@ public class PartitionReplicaListener implements ReplicaListener { * @return Future. */ private CompletionStage<Void> processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) { - return raftClient.run(new SafeTimeSyncCommand()); + return raftClient.run(replicaMessagesFactory.safeTimeSyncCommand().build()); } /** @@ -1159,7 +1161,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return allOf(rowIdLockFuts).thenCompose(ignore -> { - Map<UUID, ByteString> rowIdsToDelete = new HashMap<>(); + Map<UUID, ByteBuffer> rowIdsToDelete = new HashMap<>(); Collection<BinaryRow> result = new ArrayList<>(); int futNum = 0; @@ -1198,7 +1200,7 @@ public class PartitionReplicaListener implements ReplicaListener { } return allOf(deleteExactLockFuts).thenCompose(ignore -> { - Map<UUID, ByteString> rowIdsToDelete = new HashMap<>(); + Map<UUID, ByteBuffer> rowIdsToDelete = new HashMap<>(); Collection<BinaryRow> result = new ArrayList<>(); int futNum = 0; @@ -1263,10 +1265,10 @@ public class PartitionReplicaListener implements ReplicaListener { insertLockFuts[idx++] = takeLocksForInsert(entry.getValue(), entry.getKey(), txId); } - Map<UUID, ByteString> convertedMap = rowsToInsert.entrySet().stream().collect( + Map<UUID, ByteBuffer> convertedMap = rowsToInsert.entrySet().stream().collect( Collectors.toMap( e -> e.getKey().uuid(), - e -> new ByteString(e.getValue().byteBuffer()))); + e -> e.getValue().byteBuffer())); return allOf(insertLockFuts) .thenCompose(ignored -> applyCmdWithExceptionHandling( @@ -1300,14 +1302,14 @@ public class PartitionReplicaListener implements ReplicaListener { } return allOf(rowIdFuts).thenCompose(ignore -> { - Map<UUID, ByteString> rowsToUpdate = new HashMap<>(); + Map<UUID, ByteBuffer> rowsToUpdate = new HashMap<>(); int futNum = 0; for (BinaryRow row : request.binaryRows()) { RowId lockedRow = rowIdFuts[futNum++].join().get1(); - rowsToUpdate.put(lockedRow.uuid(), new ByteString(row.byteBuffer())); + rowsToUpdate.put(lockedRow.uuid(), row.byteBuffer()); } if (rowsToUpdate.isEmpty()) { @@ -1911,7 +1913,7 @@ public class PartitionReplicaListener implements ReplicaListener { .txId(txId); if (rowBuf != null) { - bldr.rowBuffer(new ByteString(rowBuf)); + bldr.rowBuffer(rowBuf); } return bldr.build(); @@ -1921,11 +1923,11 @@ public class PartitionReplicaListener implements ReplicaListener { * Method to construct {@link UpdateAllCommand} object. * * @param tablePartId {@link TablePartitionId} object to construct {@link UpdateCommand} object with. - * @param rowsToUpdate All {@link BinaryRow}s represented as {@link ByteString}s to be updated. + * @param rowsToUpdate All {@link BinaryRow}s represented as {@link ByteBuffer}s to be updated. * @param txId Transaction ID. * @return Constructed {@link UpdateAllCommand} object. */ - private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, Map<UUID, ByteString> rowsToUpdate, UUID txId) { + private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, Map<UUID, ByteBuffer> rowsToUpdate, UUID txId) { return msgFactory.updateAllCommand() .tablePartitionId(tablePartitionId(tablePartId)) .rowsToUpdate(rowsToUpdate) diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java index eb6f5cb9b2..7d885903f9 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java @@ -17,16 +17,12 @@ package org.apache.ignite.internal.table.distributed.command; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.Locale; @@ -36,17 +32,18 @@ import java.util.UUID; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.schema.marshaller.KvMarshaller; import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.TableMessagesFactory; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tostring.IgniteToStringInclude; import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.tx.Timestamp; -import org.apache.ignite.raft.jraft.util.ByteString; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -84,7 +81,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { .build() ) .rowUuid(Timestamp.nextVersion().toUuid()) - .rowBuffer(byteStrFromBinaryRow(1)) + .rowBuffer(byteBufferFromBinaryRow(1)) .txId(UUID.randomUUID()) .build(); @@ -92,7 +89,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { assertEquals(cmd.txId(), readCmd.txId()); assertEquals(cmd.rowUuid(), readCmd.rowUuid()); - assertArrayEquals(cmd.rowBuffer().toByteArray(), readCmd.rowBuffer().toByteArray()); + assertEquals(cmd.rowBuffer(), readCmd.rowBuffer()); } @Test @@ -116,10 +113,10 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { @Test public void testUpdateAllCommand() throws Exception { - HashMap<UUID, ByteString> rowsToUpdate = new HashMap(); + Map<UUID, ByteBuffer> rowsToUpdate = new HashMap<>(); for (int i = 0; i < 10; i++) { - rowsToUpdate.put(Timestamp.nextVersion().toUuid(), byteStrFromBinaryRow(i)); + rowsToUpdate.put(Timestamp.nextVersion().toUuid(), byteBufferFromBinaryRow(i)); } var cmd = msgFactory.updateAllCommand() @@ -136,19 +133,19 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { assertEquals(cmd.txId(), readCmd.txId()); - for (Map.Entry<UUID, ByteString> entry : cmd.rowsToUpdate().entrySet()) { + for (Map.Entry<UUID, ByteBuffer> entry : cmd.rowsToUpdate().entrySet()) { assertTrue(readCmd.rowsToUpdate().containsKey(entry.getKey())); var readVal = readCmd.rowsToUpdate().get(entry.getKey()); var val = entry.getValue(); - assertArrayEquals(val.toByteArray(), readVal.toByteArray()); + assertEquals(val, readVal); } } @Test public void testRemoveAllCommand() throws Exception { - Map<UUID, ByteString> rowsToRemove = new HashMap<>(); + Map<UUID, ByteBuffer> rowsToRemove = new HashMap<>(); for (int i = 0; i < 10; i++) { rowsToRemove.put(Timestamp.nextVersion().toUuid(), null); @@ -225,33 +222,52 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest { .build(); } - private <T> T copyCommand(T cmd) throws Exception { - return cmdFromBytes(cmdToBytes(cmd)); - } - - private <T> T cmdFromBytes(byte[] bytes) throws IOException, ClassNotFoundException { - try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) { - try (ObjectInputStream ois = new ObjectInputStream(bais)) { - return (T) ois.readObject(); - } - } - } - - private <T> byte[] cmdToBytes(T cmd) throws IOException { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { - oos.writeObject(cmd); - } - - baos.flush(); - - return baos.toByteArray(); + private <T extends Command> T copyCommand(T cmd) { + assertEquals(TableMessageGroup.GROUP_TYPE, cmd.groupType()); + + if (cmd instanceof FinishTxCommand) { + FinishTxCommand finishTxCommand = (FinishTxCommand) cmd; + + return (T) msgFactory.finishTxCommand() + .txId(finishTxCommand.txId()) + .commit(finishTxCommand.commit()) + .tablePartitionIds(finishTxCommand.tablePartitionIds()) + .commitTimestamp(finishTxCommand.commitTimestamp()) + .build(); + } else if (cmd instanceof TxCleanupCommand) { + TxCleanupCommand txCleanupCommand = (TxCleanupCommand) cmd; + + return (T) msgFactory.txCleanupCommand() + .txId(txCleanupCommand.txId()) + .commit(txCleanupCommand.commit()) + .commitTimestamp(txCleanupCommand.commitTimestamp()) + .build(); + } else if (cmd instanceof UpdateCommand) { + UpdateCommand updateCommand = (UpdateCommand) cmd; + + return (T) msgFactory.updateCommand() + .txId(updateCommand.txId()) + .rowUuid(updateCommand.rowUuid()) + .tablePartitionId(updateCommand.tablePartitionId()) + .rowBuffer(updateCommand.rowBuffer()) + .build(); + } else if (cmd instanceof UpdateAllCommand) { + UpdateAllCommand updateCommand = (UpdateAllCommand) cmd; + + return (T) msgFactory.updateAllCommand() + .txId(updateCommand.txId()) + .rowsToUpdate(updateCommand.rowsToUpdate()) + .tablePartitionId(updateCommand.tablePartitionId()) + .build(); + } else { + fail(cmd.toString()); + + return null; } } - private static ByteString byteStrFromBinaryRow(int id) throws Exception { - return new ByteString(kvMarshaller.marshal(new TestKey(id, String.valueOf(id)), new TestValue(id, String.valueOf(id))) - .byteBuffer()); + private static ByteBuffer byteBufferFromBinaryRow(int id) throws Exception { + return kvMarshaller.marshal(new TestKey(id, String.valueOf(id)), new TestValue(id, String.valueOf(id))).byteBuffer(); } /** diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index 6097adac4c..d688b11635 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.Serializable; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; @@ -58,6 +59,8 @@ import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.CommittedConfiguration; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand; +import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommandBuilder; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.BinaryTupleSchema; @@ -92,7 +95,6 @@ import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NetworkAddress; -import org.apache.ignite.raft.jraft.util.ByteString; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -336,8 +338,10 @@ public class PartitionCommandListenerTest { @Test void updatesLastAppliedForSafeTimeSyncCommands() { + SafeTimeSyncCommand safeTimeSyncCommand = new ReplicaMessagesFactory().safeTimeSyncCommand().build(); + commandListener.onWrite(List.of( - writeCommandCommandClosure(3, 2, new SafeTimeSyncCommand(), commandClosureResultCaptor) + writeCommandCommandClosure(3, 2, safeTimeSyncCommand, commandClosureResultCaptor) ).iterator()); verify(mvPartitionStorage).lastApplied(3, 2); @@ -345,8 +349,10 @@ public class PartitionCommandListenerTest { @Test void locksOnCommandApplication() { + SafeTimeSyncCommandBuilder safeTimeSyncCommand = new ReplicaMessagesFactory().safeTimeSyncCommand(); + commandListener.onWrite(List.of( - writeCommandCommandClosure(3, 2, new SafeTimeSyncCommand(), commandClosureResultCaptor) + writeCommandCommandClosure(3, 2, safeTimeSyncCommand.build(), commandClosureResultCaptor) ).iterator()); InOrder inOrder = inOrder(partitionDataStorage); @@ -490,14 +496,14 @@ public class PartitionCommandListenerTest { * Inserts all rows. */ private void insertAll() { - HashMap<UUID, ByteString> rows = new HashMap<>(KEY_COUNT); + Map<UUID, ByteBuffer> rows = new HashMap<>(KEY_COUNT); UUID txId = Timestamp.nextVersion().toUuid(); var commitPartId = new TablePartitionId(txId, PARTITION_ID); for (int i = 0; i < KEY_COUNT; i++) { Row row = getTestRow(i, i); - rows.put(Timestamp.nextVersion().toUuid(), new ByteString(row.byteBuffer())); + rows.put(Timestamp.nextVersion().toUuid(), row.byteBuffer()); } HybridTimestamp commitTimestamp = CLOCK.now(); @@ -529,12 +535,12 @@ public class PartitionCommandListenerTest { private void updateAll(Function<Integer, Integer> keyValueMapper) { UUID txId = Timestamp.nextVersion().toUuid(); var commitPartId = new TablePartitionId(txId, PARTITION_ID); - HashMap<UUID, ByteString> rows = new HashMap<>(KEY_COUNT); + Map<UUID, ByteBuffer> rows = new HashMap<>(KEY_COUNT); for (int i = 0; i < KEY_COUNT; i++) { Row row = getTestRow(i, keyValueMapper.apply(i)); - rows.put(readRow(row).uuid(), new ByteString(row.byteBuffer())); + rows.put(readRow(row).uuid(), row.byteBuffer()); } HybridTimestamp commitTimestamp = CLOCK.now(); @@ -564,7 +570,7 @@ public class PartitionCommandListenerTest { private void deleteAll() { UUID txId = Timestamp.nextVersion().toUuid(); var commitPartId = new TablePartitionId(txId, PARTITION_ID); - Map<UUID, ByteString> keyRows = new HashMap<>(KEY_COUNT); + Map<UUID, ByteBuffer> keyRows = new HashMap<>(KEY_COUNT); for (int i = 0; i < KEY_COUNT; i++) { Row row = getTestRow(i, i); @@ -617,8 +623,8 @@ public class PartitionCommandListenerTest { .tablePartitionId(msgFactory.tablePartitionIdMessage() .tableId(txId) .partitionId(PARTITION_ID).build()) - .rowUuid(new UUID(rowId.mostSignificantBits(), rowId.leastSignificantBits())) - .rowBuffer(new ByteString(row.byteBuffer())) + .rowUuid(rowId.uuid()) + .rowBuffer(row.byteBuffer()) .txId(txId) .build()); @@ -663,7 +669,7 @@ public class PartitionCommandListenerTest { .tablePartitionId(msgFactory.tablePartitionIdMessage() .tableId(txId) .partitionId(PARTITION_ID).build()) - .rowUuid(new UUID(rowId.mostSignificantBits(), rowId.leastSignificantBits())) + .rowUuid(rowId.uuid()) .txId(txId) .build()); @@ -729,7 +735,6 @@ public class PartitionCommandListenerTest { commandListener.onWrite(iterator((i, clo) -> { UUID txId = Timestamp.nextVersion().toUuid(); Row row = getTestRow(i, i); - var commitPartId = new TablePartitionId(txId, PARTITION_ID); txIds.add(txId); when(clo.index()).thenReturn(raftIndex.incrementAndGet()); @@ -740,7 +745,7 @@ public class PartitionCommandListenerTest { .tableId(txId) .partitionId(PARTITION_ID).build()) .rowUuid(Timestamp.nextVersion().toUuid()) - .rowBuffer(new ByteString(row.byteBuffer())) + .rowBuffer(row.byteBuffer()) .txId(txId) .build()); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java index 17475d1db2..4ea1136f24 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java @@ -355,7 +355,7 @@ public class IncomingSnapshotCopierTest { responseEntries.add( TABLE_MSG_FACTORY.responseEntry() - .rowId(new UUID(rowId.mostSignificantBits(), rowId.leastSignificantBits())) + .rowId(rowId.uuid()) .rowVersions(rowVersions) .timestamps(timestamps) .txId(txId)