This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 71a95cb5bd IGNITE-23436 Do not use ByteUtils#toBytes to persist
objects related to CMG (#4562)
71a95cb5bd is described below
commit 71a95cb5bd73f759e1480bfc3816d1fce2cfbebe
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 16 16:36:00 2024 +0400
IGNITE-23436 Do not use ByteUtils#toBytes to persist objects related to CMG
(#4562)
---
.../org/apache/ignite/network/ClusterNode.java | 3 +-
.../org/apache/ignite/network/NetworkAddress.java | 3 +-
modules/cluster-management/build.gradle | 3 +-
.../ClusterStatePersistentSerializer.java | 121 +++++++++++++++++++++
.../cluster/management/LocalStateStorage.java | 51 +++++++--
.../raft/ClusterStateStorageManager.java | 13 ++-
.../management/raft/CmgRaftGroupListener.java | 42 +++----
.../management/topology/LogicalTopologyImpl.java | 9 +-
.../management/topology/api/LogicalNode.java | 42 ++++---
.../topology/api/LogicalNodeSerializer.java | 96 ++++++++++++++++
.../topology/api/LogicalTopologySnapshot.java | 5 +-
.../api/LogicalTopologySnapshotSerializer.java | 64 +++++++++++
.../ClusterStatePersistentSerializerTest.java | 89 +++++++++++++++
.../cluster/management/LocalStateStorageTest.java | 81 ++++++++++++++
.../topology/api/LogicalNodeSerializerTest.java | 75 +++++++++++++
.../api/LogicalTopologySnapshotSerializerTest.java | 97 +++++++++++++++++
.../apache/ignite/internal/util/VarIntUtils.java | 68 ++++++++++--
.../ignite/internal/util/io/IgniteDataInput.java | 22 ++++
.../ignite/internal/util/io/IgniteDataOutput.java | 9 ++
.../internal/util/io/IgniteUnsafeDataInput.java | 15 ++-
.../internal/util/io/IgniteUnsafeDataOutput.java | 15 ++-
.../util/io/{VarInts.java => NaiveVarInts.java} | 12 +-
.../internal/versioned/VersionedSerialization.java | 82 ++++++++++++++
.../internal/versioned/VersionedSerializer.java | 98 +++++++++++++++++
.../ignite/internal/util/VarIntUtilsTest.java | 78 +++++++++----
.../io/{VarIntsTest.java => NaiveVarIntsTest.java} | 8 +-
...butionZoneManagerLogicalTopologyEventsTest.java | 18 ++-
.../DistributionZoneCausalityDataNodesTest.java | 11 +-
.../internal/network/ClusterNodeSerializer.java | 69 ++++++++++++
.../network/ClusterNodeSerializerTest.java | 63 +++++++++++
.../serialization/marshal/ProtocolMarshalling.java | 14 +--
.../PartitionCommandsMarshallerImpl.java | 2 +-
.../system/SystemDisasterRecoveryStorage.java | 7 +-
.../SystemDisasterRecoveryManagerImplTest.java | 10 +-
.../schema/PartitionCommandsMarshallerImpl.java | 2 +-
35 files changed, 1271 insertions(+), 126 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
index b3a8cadaa4..674f9adf83 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -17,14 +17,13 @@
package org.apache.ignite.network;
-import java.io.Serializable;
import java.util.UUID;
import org.jetbrains.annotations.Nullable;
/**
* Representation of a node in a cluster.
*/
-public interface ClusterNode extends Serializable {
+public interface ClusterNode {
/**
* Returns the node's local ID.
*
diff --git
a/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
b/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
index b909fd3080..9c8163fa91 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
@@ -17,7 +17,6 @@
package org.apache.ignite.network;
-import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -25,7 +24,7 @@ import java.util.regex.Pattern;
/**
* Representation of a network address that includes a host name and a port.
*/
-public class NetworkAddress implements Serializable {
+public class NetworkAddress {
/** Regexp for parsing strings in the "host:port" format. */
private static final Pattern ADDRESS_PATTERN =
Pattern.compile("(.+):(\\d+)");
diff --git a/modules/cluster-management/build.gradle
b/modules/cluster-management/build.gradle
index b702a46b15..0aa2d77855 100644
--- a/modules/cluster-management/build.gradle
+++ b/modules/cluster-management/build.gradle
@@ -49,10 +49,11 @@ dependencies {
testImplementation project(':ignite-core')
testImplementation project(':ignite-configuration')
testImplementation project(':ignite-network')
+ testImplementation project(':ignite-system-disaster-recovery')
testImplementation testFixtures(project(':ignite-core'))
testImplementation testFixtures(project(':ignite-configuration'))
testImplementation testFixtures(project(':ignite-network'))
- testImplementation project(':ignite-system-disaster-recovery')
+ testImplementation testFixtures(project(':ignite-vault'))
testImplementation libs.hamcrest.core
testImplementation libs.mockito.junit
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
new file mode 100644
index 0000000000..7bd3240751
--- /dev/null
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link VersionedSerializer} for {@link ClusterState} instance.
+ */
+public class ClusterStatePersistentSerializer extends
VersionedSerializer<ClusterState> {
+ private static final CmgMessagesFactory CMG_MSGS_FACTORY = new
CmgMessagesFactory();
+
+ /** Serializer instance. */
+ public static final ClusterStatePersistentSerializer INSTANCE = new
ClusterStatePersistentSerializer();
+
+ @Override
+ protected void writeExternalData(ClusterState state, IgniteDataOutput out)
throws IOException {
+ writeStringSet(state.cmgNodes(), out);
+ writeStringSet(state.metaStorageNodes(), out);
+ out.writeUTF(state.version());
+ out.writeUTF(state.clusterTag().clusterName());
+ out.writeUuid(state.clusterTag().clusterId());
+ writeNullableString(state.initialClusterConfiguration(), out);
+
+ List<UUID> formerClusterIds = state.formerClusterIds();
+ out.writeVarInt(formerClusterIds == null ? -1 :
formerClusterIds.size());
+ if (formerClusterIds != null) {
+ for (UUID clusterId : formerClusterIds) {
+ out.writeUuid(clusterId);
+ }
+ }
+ }
+
+ private static void writeStringSet(Set<String> strings, IgniteDataOutput
out) throws IOException {
+ out.writeVarInt(strings.size());
+ for (String str : strings) {
+ out.writeUTF(str);
+ }
+ }
+
+ private static void writeNullableString(@Nullable String str,
IgniteDataOutput out) throws IOException {
+ out.writeVarInt(str == null ? -1 : str.length());
+ if (str != null) {
+ out.writeByteArray(str.getBytes(UTF_8));
+ }
+ }
+
+ @Override
+ protected ClusterState readExternalData(byte protoVer, IgniteDataInput in)
throws IOException {
+ return CMG_MSGS_FACTORY.clusterState()
+ .cmgNodes(readStringSet(in))
+ .metaStorageNodes(readStringSet(in))
+ .version(in.readUTF())
+ .clusterTag(ClusterTag.clusterTag(CMG_MSGS_FACTORY,
in.readUTF(), in.readUuid()))
+ .initialClusterConfiguration(readNullableString(in))
+ .formerClusterIds(readFormerClusterIds(in))
+ .build();
+ }
+
+ private static Set<String> readStringSet(IgniteDataInput in) throws
IOException {
+ int size = in.readVarIntAsInt();
+
+ Set<String> result = new HashSet<>(size);
+ for (int i = 0; i < size; i++) {
+ result.add(in.readUTF());
+ }
+
+ return result;
+ }
+
+ private static @Nullable String readNullableString(IgniteDataInput in)
throws IOException {
+ int lengthOrMinusOne = in.readVarIntAsInt();
+ if (lengthOrMinusOne == -1) {
+ return null;
+ }
+
+ return new String(in.readByteArray(lengthOrMinusOne), UTF_8);
+ }
+
+ private static @Nullable List<UUID> readFormerClusterIds(IgniteDataInput
in) throws IOException {
+ int length = in.readVarIntAsInt();
+
+ if (length == -1) {
+ return null;
+ }
+
+ List<UUID> result = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ result.add(in.readUuid());
+ }
+
+ return result;
+ }
+}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
index d708e5e038..a23d88f202 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
@@ -17,12 +17,17 @@
package org.apache.ignite.internal.cluster.management;
-import java.io.Serializable;
+import java.io.IOException;
+import java.util.HashSet;
import java.util.Set;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
import org.jetbrains.annotations.Nullable;
/**
@@ -33,9 +38,7 @@ import org.jetbrains.annotations.Nullable;
class LocalStateStorage {
private static final ByteArray CMG_STATE_VAULT_KEY =
ByteArray.fromString("cmg_state");
- static class LocalState implements Serializable {
- private static final long serialVersionUID = -5069326157367860480L;
-
+ static class LocalState {
private final Set<String> cmgNodeNames;
private final ClusterTag clusterTag;
@@ -68,7 +71,11 @@ class LocalStateStorage {
@Nullable LocalState getLocalState() {
VaultEntry entry = vault.get(CMG_STATE_VAULT_KEY);
- return entry == null ? null : ByteUtils.fromBytes(entry.value());
+ if (entry == null) {
+ return null;
+ }
+
+ return VersionedSerialization.fromBytes(entry.value(),
LocalStateSerializer.INSTANCE);
}
/**
@@ -77,7 +84,7 @@ class LocalStateStorage {
* @param state Local state to save.
*/
void saveLocalState(LocalState state) {
- vault.put(CMG_STATE_VAULT_KEY, ByteUtils.toBytes(state));
+ vault.put(CMG_STATE_VAULT_KEY, VersionedSerialization.toBytes(state,
LocalStateSerializer.INSTANCE));
}
/**
@@ -86,4 +93,34 @@ class LocalStateStorage {
void clear() {
vault.remove(CMG_STATE_VAULT_KEY);
}
+
+ private static class LocalStateSerializer extends
VersionedSerializer<LocalState> {
+ private static final CmgMessagesFactory CMG_MESSAGES_FACTORY = new
CmgMessagesFactory();
+
+ private static final LocalStateSerializer INSTANCE = new
LocalStateSerializer();
+
+ @Override
+ protected void writeExternalData(LocalState state, IgniteDataOutput
out) throws IOException {
+ out.writeVarInt(state.cmgNodeNames().size());
+ for (String cmgNodeName : state.cmgNodeNames()) {
+ out.writeUTF(cmgNodeName);
+ }
+
+ out.writeUTF(state.clusterTag().clusterName());
+ out.writeUuid(state.clusterTag().clusterId());
+ }
+
+ @Override
+ protected LocalState readExternalData(byte protoVer, IgniteDataInput
in) throws IOException {
+ int cmgNodesCount = in.readVarIntAsInt();
+ Set<String> cmgNodeNames = new HashSet<>(cmgNodesCount);
+ for (int i = 0; i < cmgNodesCount; i++) {
+ cmgNodeNames.add(in.readUTF());
+ }
+
+ ClusterTag clusterTag =
ClusterTag.clusterTag(CMG_MESSAGES_FACTORY, in.readUTF(), in.readUuid());
+
+ return new LocalState(cmgNodeNames, clusterTag);
+ }
+ }
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
index 44fe6a989f..d4365a105b 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
@@ -20,9 +20,7 @@ package org.apache.ignite.internal.cluster.management.raft;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.bytesToUuid;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
import java.nio.ByteBuffer;
@@ -31,7 +29,10 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.cluster.management.ClusterState;
+import
org.apache.ignite.internal.cluster.management.ClusterStatePersistentSerializer;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalNodeSerializer;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.jetbrains.annotations.Nullable;
/**
@@ -62,7 +63,7 @@ public class ClusterStateStorageManager {
public ClusterState getClusterState() {
byte[] value = storage.get(CMG_STATE_KEY);
- return value == null ? null : fromBytes(value);
+ return value == null ? null : VersionedSerialization.fromBytes(value,
ClusterStatePersistentSerializer.INSTANCE);
}
/**
@@ -71,7 +72,7 @@ public class ClusterStateStorageManager {
* @param state Cluster state.
*/
public void putClusterState(ClusterState state) {
- storage.put(CMG_STATE_KEY, toBytes(state));
+ storage.put(CMG_STATE_KEY, VersionedSerialization.toBytes(state,
ClusterStatePersistentSerializer.INSTANCE));
}
/**
@@ -87,7 +88,7 @@ public class ClusterStateStorageManager {
* Marks the given node as validated.
*/
void putValidatedNode(LogicalNode node) {
- storage.put(validatedNodeKey(node.id()), toBytes(node));
+ storage.put(validatedNodeKey(node.id()),
VersionedSerialization.toBytes(node, LogicalNodeSerializer.INSTANCE));
}
/**
@@ -110,7 +111,7 @@ public class ClusterStateStorageManager {
* Returns a collection of nodes that passed the validation but have not
yet joined the logical topology.
*/
List<LogicalNode> getValidatedNodes() {
- return storage.getWithPrefix(VALIDATED_NODE_PREFIX, (k, v) ->
fromBytes(v));
+ return storage.getWithPrefix(VALIDATED_NODE_PREFIX, (k, v) ->
VersionedSerialization.fromBytes(v, LogicalNodeSerializer.INSTANCE));
}
/**
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index 0d724fdd92..297ad77350 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -17,12 +17,14 @@
package org.apache.ignite.internal.cluster.management.raft;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Objects.requireNonNullElse;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.util.IgniteUtils.capacity;
import java.io.Serializable;
import java.nio.file.Path;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -237,17 +239,15 @@ public class CmgRaftGroupListener implements
RaftGroupListener {
}
private ValidationResult validateNode(JoinRequestCommand command) {
- ClusterNode node = command.node().asClusterNode();
-
Optional<LogicalNode> previousVersion =
logicalTopology.getLogicalTopology().nodes()
.stream()
- .filter(n -> n.name().equals(node.name()))
+ .filter(n -> n.name().equals(command.node().name()))
.findAny();
if (previousVersion.isPresent()) {
LogicalNode previousNode = previousVersion.get();
- if (previousNode.id().equals(node.id())) {
+ if (previousNode.id().equals(command.node().id())) {
return ValidationResult.successfulResult();
} else {
// Remove the previous node from the Logical Topology in case
we haven't received the disappeared event yet.
@@ -255,25 +255,15 @@ public class CmgRaftGroupListener implements
RaftGroupListener {
}
}
- LogicalNode logicalNode = new LogicalNode(
- node,
- command.node().userAttributes(),
- command.node().systemAttributes(),
- command.node().storageProfiles()
- );
+ LogicalNode logicalNode =
logicalNodeFromClusterNodeMessage(command.node());
return
validationManager.validateNode(storageManager.getClusterState(), logicalNode,
command.igniteVersion(), command.clusterTag());
}
private ValidationResult completeValidation(JoinReadyCommand command) {
- ClusterNode node = command.node().asClusterNode();
+ ClusterNodeMessage clusterNodeMessage = command.node();
- LogicalNode logicalNode = new LogicalNode(
- node,
- command.node().userAttributes(),
- command.node().systemAttributes(),
- command.node().storageProfiles()
- );
+ LogicalNode logicalNode =
logicalNodeFromClusterNodeMessage(clusterNodeMessage);
if (validationManager.isNodeValidated(logicalNode)) {
ValidationResult validationResponse =
validationManager.completeValidation(logicalNode);
@@ -284,16 +274,28 @@ public class CmgRaftGroupListener implements
RaftGroupListener {
return validationResponse;
} else {
- return ValidationResult.errorResult(String.format("Node \"%s\" has
not yet passed the validation step", node));
+ return ValidationResult.errorResult(String.format("Node \"%s\" has
not yet passed the validation step",
+ clusterNodeMessage.asClusterNode()));
}
}
+ private static LogicalNode
logicalNodeFromClusterNodeMessage(ClusterNodeMessage message) {
+ ClusterNode node = message.asClusterNode();
+
+ return new LogicalNode(
+ node,
+ requireNonNullElse(message.userAttributes(), emptyMap()),
+ requireNonNullElse(message.systemAttributes(), emptyMap()),
+ requireNonNullElse(message.storageProfiles(), emptyList())
+ );
+ }
+
private void removeNodesFromLogicalTopology(NodesLeaveCommand command) {
Set<ClusterNode> nodes =
command.nodes().stream().map(ClusterNodeMessage::asClusterNode).collect(Collectors.toSet());
// Nodes will be removed from a topology, so it is safe to set
nodeAttributes to the default value
Set<LogicalNode> logicalNodes = nodes.stream()
- .map(n -> new LogicalNode(n, Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyList()))
+ .map(n -> new LogicalNode(n, emptyMap(), emptyMap(),
emptyList()))
.collect(Collectors.toSet());
logicalTopology.removeNodes(logicalNodes);
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
index 4c0a1495cd..4f634e4452 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
@@ -22,8 +22,6 @@ import static java.util.Comparator.comparing;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import java.util.ArrayList;
import java.util.List;
@@ -38,8 +36,10 @@ import
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageMan
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshotSerializer;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.jetbrains.annotations.Nullable;
/**
@@ -74,7 +74,8 @@ public class LogicalTopologyImpl implements LogicalTopology {
private LogicalTopologySnapshot readLogicalTopology() {
byte[] bytes = storage.get(LOGICAL_TOPOLOGY_KEY);
- return bytes == null ? LogicalTopologySnapshot.INITIAL :
fromBytes(bytes);
+ return bytes == null ? LogicalTopologySnapshot.INITIAL
+ : VersionedSerialization.fromBytes(bytes,
LogicalTopologySnapshotSerializer.INSTANCE);
}
@Override
@@ -152,7 +153,7 @@ public class LogicalTopologyImpl implements LogicalTopology
{
}
private void saveSnapshotToStorage(LogicalTopologySnapshot newTopology) {
- storage.put(LOGICAL_TOPOLOGY_KEY, toBytes(newTopology));
+ storage.put(LOGICAL_TOPOLOGY_KEY,
VersionedSerialization.toBytes(newTopology,
LogicalTopologySnapshotSerializer.INSTANCE));
}
@Override
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNode.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNode.java
index d4367dd456..9577d07a5f 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNode.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNode.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.cluster.management.topology.api;
-import java.util.Collections;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -26,6 +28,8 @@ import
org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeMetadata;
+import org.jetbrains.annotations.Nullable;
/**
* Representation of a logical node in a cluster.
@@ -57,11 +61,7 @@ public class LogicalNode extends ClusterNodeImpl {
* @param address Node address.
*/
public LogicalNode(UUID id, String name, NetworkAddress address) {
- super(id, name, address);
-
- this.userAttributes = Collections.emptyMap();
- this.systemAttributes = Collections.emptyMap();
- this.storageProfiles = Collections.emptyList();
+ this(id, name, address, null, emptyMap(), emptyMap(), emptyList());
}
/**
@@ -71,28 +71,24 @@ public class LogicalNode extends ClusterNodeImpl {
* @param userAttributes Node attributes defined in configuration.
*/
public LogicalNode(ClusterNode clusterNode, Map<String, String>
userAttributes) {
- this(clusterNode, userAttributes, Collections.emptyMap(),
Collections.emptyList());
+ this(clusterNode, userAttributes, emptyMap(), emptyList());
}
/**
* Constructor.
*
- * @param clusterNode Represents a node in a cluster.
+ * @param node Represents a node in a cluster.
* @param userAttributes Node attributes defined in configuration.
* @param systemAttributes Internal node attributes provided by system
components at startup.
* @param storageProfiles List of storage profiles, which the node
supports.
*/
public LogicalNode(
- ClusterNode clusterNode,
+ ClusterNode node,
Map<String, String> userAttributes,
Map<String, String> systemAttributes,
List<String> storageProfiles
) {
- super(clusterNode.id(), clusterNode.name(), clusterNode.address(),
clusterNode.nodeMetadata());
-
- this.userAttributes = userAttributes == null ? Collections.emptyMap()
: userAttributes;
- this.systemAttributes = systemAttributes == null ?
Collections.emptyMap() : systemAttributes;
- this.storageProfiles = storageProfiles;
+ this(node.id(), node.name(), node.address(), node.nodeMetadata(),
userAttributes, systemAttributes, storageProfiles);
}
/**
@@ -101,7 +97,23 @@ public class LogicalNode extends ClusterNodeImpl {
* @param clusterNode Represents a node in a cluster.
*/
public LogicalNode(ClusterNode clusterNode) {
- this(clusterNode, Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyList());
+ this(clusterNode, emptyMap(), emptyMap(), emptyList());
+ }
+
+ LogicalNode(
+ UUID id,
+ String name,
+ NetworkAddress address,
+ @Nullable NodeMetadata nodeMetadata,
+ Map<String, String> userAttributes,
+ Map<String, String> systemAttributes,
+ List<String> storageProfiles
+ ) {
+ super(id, name, address, nodeMetadata);
+
+ this.userAttributes = Map.copyOf(userAttributes);
+ this.systemAttributes = Map.copyOf(systemAttributes);
+ this.storageProfiles = List.copyOf(storageProfiles);
}
/**
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNodeSerializer.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNodeSerializer.java
new file mode 100644
index 0000000000..6a7a782bf5
--- /dev/null
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNodeSerializer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.topology.api;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.ignite.internal.network.ClusterNodeSerializer;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * {@link VersionedSerializer} for {@link LogicalNode} instances.
+ */
+public class LogicalNodeSerializer extends VersionedSerializer<LogicalNode> {
+ /** Serializer instance. */
+ public static final LogicalNodeSerializer INSTANCE = new
LogicalNodeSerializer();
+
+ private final ClusterNodeSerializer clusterNodeSerializer =
ClusterNodeSerializer.INSTANCE;
+
+ @Override
+ protected void writeExternalData(LogicalNode node, IgniteDataOutput out)
throws IOException {
+ clusterNodeSerializer.writeExternal(node, out);
+
+ writeStringToStringMap(node.userAttributes(), out);
+ writeStringToStringMap(node.systemAttributes(), out);
+
+ out.writeVarInt(node.storageProfiles().size());
+ for (String profile : node.storageProfiles()) {
+ out.writeUTF(profile);
+ }
+ }
+
+ private static void writeStringToStringMap(Map<String, String> map,
IgniteDataOutput output) throws IOException {
+ output.writeVarInt(map.size());
+
+ for (Entry<String, String> entry : map.entrySet()) {
+ output.writeUTF(entry.getKey());
+ output.writeUTF(entry.getValue());
+ }
+ }
+
+ @Override
+ protected LogicalNode readExternalData(byte protoVer, IgniteDataInput in)
throws IOException {
+ ClusterNode node = clusterNodeSerializer.readExternal(in);
+
+ Map<String, String> userAttributes = readStringToStringMap(in);
+ Map<String, String> systemAttributes = readStringToStringMap(in);
+ List<String> storageProfiles = readStringList(in);
+
+ return new LogicalNode(node, userAttributes, systemAttributes,
storageProfiles);
+ }
+
+ private static Map<String, String> readStringToStringMap(IgniteDataInput
in) throws IOException {
+ int size = in.readVarIntAsInt();
+
+ var map = new HashMap<String, String>(IgniteUtils.capacity(size));
+ for (int i = 0; i < size; i++) {
+ map.put(in.readUTF(), in.readUTF());
+ }
+
+ return map;
+ }
+
+ private static List<String> readStringList(IgniteDataInput in) throws
IOException {
+ int size = in.readVarIntAsInt();
+
+ var list = new ArrayList<String>(size);
+ for (int i = 0; i < size; i++) {
+ list.add(in.readUTF());
+ }
+
+ return list;
+ }
+}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshot.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshot.java
index ed2823b5dc..79b29d93d3 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshot.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshot.java
@@ -19,7 +19,6 @@ package
org.apache.ignite.internal.cluster.management.topology.api;
import static java.util.Collections.emptySet;
-import java.io.Serializable;
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
@@ -33,9 +32,7 @@ import org.jetbrains.annotations.TestOnly;
*
* <p>Instances of this class are immutable.
*/
-public class LogicalTopologySnapshot implements Serializable {
- private static final long serialVersionUID = 0L;
-
+public class LogicalTopologySnapshot {
/** Version that first topology snapshot in history will have. */
public static final long FIRST_VERSION = 1;
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshotSerializer.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshotSerializer.java
new file mode 100644
index 0000000000..e281ab9198
--- /dev/null
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshotSerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.topology.api;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link LogicalTopologySnapshot} instances.
+ */
+public class LogicalTopologySnapshotSerializer extends
VersionedSerializer<LogicalTopologySnapshot> {
+ /** Serializer instance. */
+ public static final LogicalTopologySnapshotSerializer INSTANCE = new
LogicalTopologySnapshotSerializer();
+
+ private final LogicalNodeSerializer logicalNodeSerializer =
LogicalNodeSerializer.INSTANCE;
+
+ @Override
+ protected void writeExternalData(LogicalTopologySnapshot snapshot,
IgniteDataOutput out) throws IOException {
+ out.writeVarInt(snapshot.version());
+
+ out.writeVarInt(snapshot.nodes().size());
+ for (LogicalNode node : snapshot.nodes()) {
+ logicalNodeSerializer.writeExternal(node, out);
+ }
+
+ out.writeUuid(snapshot.clusterId());
+ }
+
+ @Override
+ protected LogicalTopologySnapshot readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
+ long version = in.readVarInt();
+
+ int nodesCount = in.readVarIntAsInt();
+ Set<LogicalNode> nodes = new
HashSet<>(IgniteUtils.capacity(nodesCount));
+ for (int i = 0; i < nodesCount; i++) {
+ nodes.add(logicalNodeSerializer.readExternal(in));
+ }
+
+ UUID clusterId = in.readUuid();
+
+ return new LogicalTopologySnapshot(version, nodes, clusterId);
+ }
+}
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializerTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializerTest.java
new file mode 100644
index 0000000000..eb8191bb5c
--- /dev/null
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static java.util.UUID.randomUUID;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class ClusterStatePersistentSerializerTest {
+ private static final CmgMessagesFactory CMG_MESSAGES_FACTORY = new
CmgMessagesFactory();
+
+ private final ClusterStatePersistentSerializer serializer = new
ClusterStatePersistentSerializer();
+
+ @Test
+ void serializationAndDeserializationWithoutNulls() {
+ ClusterState originalState = CMG_MESSAGES_FACTORY.clusterState()
+ .cmgNodes(Set.of("a", "b"))
+ .metaStorageNodes(Set.of("c", "d"))
+ .version("3.0.0")
+ .clusterTag(ClusterTag.randomClusterTag(CMG_MESSAGES_FACTORY,
"cluster"))
+ .initialClusterConfiguration("config")
+ .formerClusterIds(List.of(randomUUID(), randomUUID()))
+ .build();
+
+ byte[] bytes = VersionedSerialization.toBytes(originalState,
serializer);
+ ClusterState restoredState = VersionedSerialization.fromBytes(bytes,
serializer);
+
+ assertThat(restoredState, is(equalTo(originalState)));
+ }
+
+ @Test
+ void serializationAndDeserializationWithNulls() {
+ ClusterState originalState = CMG_MESSAGES_FACTORY.clusterState()
+ .cmgNodes(Set.of("a", "b"))
+ .metaStorageNodes(Set.of("c", "d"))
+ .version("3.0.0")
+ .clusterTag(ClusterTag.randomClusterTag(CMG_MESSAGES_FACTORY,
"cluster"))
+ .initialClusterConfiguration(null)
+ .formerClusterIds(null)
+ .build();
+
+ byte[] bytes = VersionedSerialization.toBytes(originalState,
serializer);
+ ClusterState restoredState = VersionedSerialization.fromBytes(bytes,
serializer);
+
+ assertThat(restoredState, is(equalTo(originalState)));
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode("Ae++QwMBYQFiAwFjAWQFMy4wLjAHY2x1c3Rlcp1Ct7dR35ELuoeboFbabrgHY29uZmlnAztFBahaoEfJtxGam"
+ + "Q6WXJNFRfruL76Bv254dP54iF6V");
+ ClusterState restoredState = VersionedSerialization.fromBytes(bytes,
serializer);
+
+ assertThat(restoredState.cmgNodes(), containsInAnyOrder("a", "b"));
+ assertThat(restoredState.metaStorageNodes(), containsInAnyOrder("c",
"d"));
+ assertThat(restoredState.version(), is("3.0.0"));
+ assertThat(restoredState.initialClusterConfiguration(), is("config"));
+ assertThat(
+ restoredState.formerClusterIds(),
+
contains(UUID.fromString("c947a05a-a805-453b-935c-960e999a11b7"),
UUID.fromString("bf81be2f-eefa-4545-955e-8878fe74786e"))
+ );
+ }
+}
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/LocalStateStorageTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/LocalStateStorageTest.java
new file mode 100644
index 0000000000..ca97f2b7be
--- /dev/null
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/LocalStateStorageTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.Base64;
+import java.util.Set;
+import java.util.UUID;
+import
org.apache.ignite.internal.cluster.management.LocalStateStorage.LocalState;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class LocalStateStorageTest {
+ private static final CmgMessagesFactory CMG_MESSAGES_FACTORY = new
CmgMessagesFactory();
+
+ private VaultManager vault;
+
+ private LocalStateStorage storage;
+
+ @BeforeEach
+ void setUp() {
+ vault = new VaultManager(new InMemoryVaultService());
+
+ assertThat(vault.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+
+ storage = new LocalStateStorage(vault);
+ }
+
+ @Test
+ void serializationAndDeserialization() {
+ LocalState originalState = new LocalState(
+ Set.of("a", "b"),
+ ClusterTag.clusterTag(CMG_MESSAGES_FACTORY, "cluster", new
UUID(0x12345678L, 0x87654321L))
+ );
+
+ storage.saveLocalState(originalState);
+
+ LocalState restoredState = storage.getLocalState();
+
+ assertThat(restoredState, is(notNullValue()));
+ assertThat(restoredState.cmgNodeNames(), containsInAnyOrder("a", "b"));
+ assertThat(restoredState.clusterTag(), is(originalState.clusterTag()));
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ vault.put(new ByteArray("cmg_state"),
Base64.getDecoder().decode("Ae++QwMBYQFiB2NsdXN0ZXLvzauQeFY0EiFDZYcJutz+"));
+
+ LocalState localState = storage.getLocalState();
+
+ assertThat(localState, is(notNullValue()));
+ assertThat(localState.cmgNodeNames(), containsInAnyOrder("a", "b"));
+ assertThat(localState.clusterTag().clusterName(), is("cluster"));
+ assertThat(localState.clusterTag().clusterId(), is(new
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
+ }
+}
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNodeSerializerTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNodeSerializerTest.java
new file mode 100644
index 0000000000..72d2af3dd6
--- /dev/null
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNodeSerializerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.topology.api;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeMetadata;
+import org.junit.jupiter.api.Test;
+
+class LogicalNodeSerializerTest {
+ private final LogicalNodeSerializer serializer = new
LogicalNodeSerializer();
+
+ @Test
+ void serializationAndDeserialization() {
+ LogicalNode originalNode = new LogicalNode(
+ UUID.randomUUID(),
+ "test",
+ new NetworkAddress("host", 3000),
+ new NodeMetadata("ext-host", 3001, 3002),
+ Map.of("ukey", "uval"),
+ Map.of("skey", "sval"),
+ List.of("profile")
+ );
+
+ byte[] bytes = VersionedSerialization.toBytes(originalNode,
serializer);
+ LogicalNode restoredNode = VersionedSerialization.fromBytes(bytes,
serializer);
+
+ assertThat(restoredNode.id(), equalTo(originalNode.id()));
+ assertThat(restoredNode.name(), equalTo("test"));
+ assertThat(restoredNode.address(), equalTo(new NetworkAddress("host",
3000)));
+ assertThat(restoredNode.nodeMetadata(), equalTo(new
NodeMetadata("ext-host", 3001, 3002)));
+ assertThat(restoredNode.userAttributes(), equalTo(Map.of("ukey",
"uval")));
+ assertThat(restoredNode.systemAttributes(), equalTo(Map.of("skey",
"sval")));
+ assertThat(restoredNode.storageProfiles(),
equalTo(List.of("profile")));
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode("Ae++QwHvvkPvzauQeFY0EiFDZYcJutz+BHRlc3QEaG9zdLkXAQhleHQtaG9zdLoXuxcCBHVrZXkEdXZhbAI"
+ + "Ec2tleQRzdmFsAgdwcm9maWxl");
+
+ LogicalNode restoredNode = VersionedSerialization.fromBytes(bytes,
serializer);
+
+ assertThat(restoredNode.id(), is(new UUID(0x1234567890ABCDEFL,
0xFEDCBA0987654321L)));
+ assertThat(restoredNode.name(), equalTo("test"));
+ assertThat(restoredNode.address(), equalTo(new NetworkAddress("host",
3000)));
+ assertThat(restoredNode.nodeMetadata(), equalTo(new
NodeMetadata("ext-host", 3001, 3002)));
+ assertThat(restoredNode.userAttributes(), equalTo(Map.of("ukey",
"uval")));
+ assertThat(restoredNode.systemAttributes(), equalTo(Map.of("skey",
"sval")));
+ assertThat(restoredNode.storageProfiles(),
equalTo(List.of("profile")));
+ }
+}
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshotSerializerTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshotSerializerTest.java
new file mode 100644
index 0000000000..dcc911a5da
--- /dev/null
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshotSerializerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.topology.api;
+
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeMetadata;
+import org.junit.jupiter.api.Test;
+
+class LogicalTopologySnapshotSerializerTest {
+ private final LogicalTopologySnapshotSerializer serializer = new
LogicalTopologySnapshotSerializer();
+
+ @Test
+ void serializationAndDeserialization() {
+ LogicalTopologySnapshot originalSnapshot = new
LogicalTopologySnapshot(123L, List.of(node(0), node(1)), new UUID(1, 2));
+
+ byte[] bytes = VersionedSerialization.toBytes(originalSnapshot,
serializer);
+ LogicalTopologySnapshot restoredSnapshot =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredSnapshot.version(), is(originalSnapshot.version()));
+ assertThat(restoredSnapshot.nodes(),
equalTo(originalSnapshot.nodes()));
+ assertThat(restoredSnapshot.clusterId(),
equalTo(originalSnapshot.clusterId()));
+ }
+
+ private static LogicalNode node(int index) {
+ return new LogicalNode(
+ new UUID(0xDEADBEEFCAFEBABEL, index),
+ "node" + index,
+ new NetworkAddress("host" + index, 3000 + index),
+ new NodeMetadata("rest-host" + index, 80 + index, 443 + index),
+ Map.of("ukey", "uval" + index),
+ Map.of("skey", "sval" + index),
+ List.of("prof1", "prof2")
+ );
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode("Ae++Q3wDAe++QwHvvkO+uv7K776t3gAAAAAAAAAABW5vZGUwBWhvc3QwuRcBCnJlc3QtaG9zdDBRvAMCBHVrZ"
+ +
"XkFdXZhbDACBHNrZXkFc3ZhbDADBXByb2YxBXByb2YyAe++QwHvvkO+uv7K776t3gEAAAAAAAAABW5vZGUxBWhvc3QxuhcBCnJlc3QtaG9zdDFSvQMCBH"
+ +
"VrZXkFdXZhbDECBHNrZXkFc3ZhbDEDBXByb2YxBXByb2YyAQAAAAAAAAACAAAAAAAAAA==");
+ LogicalTopologySnapshot restoredSnapshot =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredSnapshot.version(), is(123L));
+ assertThat(restoredSnapshot.nodes(), hasSize(2));
+
+ List<LogicalNode> orderedNodes = restoredSnapshot.nodes().stream()
+ .sorted(Comparator.comparing(LogicalNode::id))
+ .collect(toList());
+
+ LogicalNode node0 = orderedNodes.get(0);
+ assertThat(node0.id(), is(new UUID(0xDEADBEEFCAFEBABEL, 0)));
+ assertThat(node0.name(), is("node0"));
+ assertThat(node0.address(), is(new NetworkAddress("host0", 3000)));
+ assertThat(node0.nodeMetadata(), is(new NodeMetadata("rest-host0", 80,
443)));
+ assertThat(node0.userAttributes(), is(Map.of("ukey", "uval0")));
+ assertThat(node0.systemAttributes(), is(Map.of("skey", "sval0")));
+ assertThat(node0.storageProfiles(), is(List.of("prof1", "prof2")));
+
+ LogicalNode node1 = orderedNodes.get(1);
+ assertThat(node1.id(), is(new UUID(0xDEADBEEFCAFEBABEL, 1)));
+ assertThat(node1.name(), is("node1"));
+ assertThat(node1.address(), is(new NetworkAddress("host1", 3001)));
+ assertThat(node1.nodeMetadata(), is(new NodeMetadata("rest-host1", 81,
444)));
+ assertThat(node1.userAttributes(), is(Map.of("ukey", "uval1")));
+ assertThat(node1.systemAttributes(), is(Map.of("skey", "sval1")));
+ assertThat(node1.storageProfiles(), is(List.of("prof1", "prof2")));
+
+ assertThat(restoredSnapshot.clusterId(), equalTo(new UUID(1, 2)));
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/VarIntUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/VarIntUtils.java
index 0688656c09..6a43e19ab0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/VarIntUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/VarIntUtils.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.util;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
/**
@@ -35,16 +38,16 @@ import java.nio.ByteBuffer;
*/
public class VarIntUtils {
/**
- * Returns number of bytes that are needed to represent the given integer
as a varint.
+ * Returns number of bytes that are needed to represent the given long as
a varint.
*
* @param val Int value.
*/
- public static int varIntLength(int val) {
+ public static int varIntLength(long val) {
val++;
int len = 0;
- while ((val & 0xFFFF_FF80) != 0) {
+ while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
len++;
val >>>= 7;
@@ -53,6 +56,31 @@ public class VarIntUtils {
return len + 1;
}
+ /**
+ * Writes a primitive {@code long} value as a varint to the provided
output.
+ *
+ * @param val Value.
+ * @return Number of bytes written.
+ */
+ public static int writeVarInt(long val, DataOutput out) throws IOException
{
+ val++;
+
+ int written = 0;
+
+ while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
+ byte b = (byte) (val | 0x80);
+
+ out.writeByte(b);
+
+ val >>>= 7;
+ written++;
+ }
+
+ out.writeByte((byte) val);
+
+ return written + 1;
+ }
+
/**
* Writes a primitive {@code int} value as a varint to the provided array.
*
@@ -61,12 +89,12 @@ public class VarIntUtils {
* @param off Offset in the target array to write result to.
* @return Number of bytes overwritten in {@code bytes} array.
*/
- public static int putVarIntToBytes(int val, byte[] bytes, int off) {
+ public static int putVarIntToBytes(long val, byte[] bytes, int off) {
val++;
int pos = off;
- while ((val & 0xFFFF_FF80) != 0) {
+ while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
byte b = (byte) (val | 0x80);
bytes[pos++] = b;
@@ -83,15 +111,37 @@ public class VarIntUtils {
* Reads a varint from a buffer.
*
* @param buf Buffer from which to read.
- * @return Integer value that was encoded as a varint.
+ * @return Long value that was encoded as a varint.
*/
- public static int readVarInt(ByteBuffer buf) {
- int res = 0;
+ public static long readVarInt(ByteBuffer buf) {
+ long res = 0;
for (int shift = 0; ; shift += 7) {
byte b = buf.get();
- res |= (b & 0x7F) << shift;
+ res |= ((long) b & 0x7F) << shift;
+
+ if (b >= 0) {
+ break;
+ }
+ }
+
+ return res - 1;
+ }
+
+ /**
+ * Reads a varint from an input.
+ *
+ * @param in Input from which to read.
+ * @return Long value that was encoded as a varint.
+ */
+ public static long readVarInt(DataInput in) throws IOException {
+ long res = 0;
+
+ for (int shift = 0; ; shift += 7) {
+ byte b = in.readByte();
+
+ res |= ((long) b & 0x7F) << shift;
if (b >= 0) {
break;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java
index 0722241f6e..93d449e270 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java
@@ -95,6 +95,28 @@ public interface IgniteDataInput extends DataInput {
*/
int read(byte[] b, int off, int len) throws IOException;
+ /**
+ * Reads a long value as a varint.
+ *
+ * @throws IOException If something goes wrong.
+ * @see IgniteDataOutput#writeVarInt(long)
+ */
+ long readVarInt() throws IOException;
+
+ /**
+ * Reads an int value as a varint.
+ *
+ * @throws IOException If something goes wrong.
+ * @see IgniteDataOutput#writeVarInt(long)
+ */
+ default int readVarIntAsInt() throws IOException {
+ long val = readVarInt();
+ if (val < Integer.MIN_VALUE || val > Integer.MAX_VALUE) {
+ throw new IOException("The value is expected to fit into int
range, but it doesn't: " + val);
+ }
+ return (int) val;
+ }
+
/**
* Reads array of {@code byte}s.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java
index c1d54f24d8..befdacf036 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java
@@ -75,6 +75,15 @@ public interface IgniteDataOutput extends DataOutput {
*/
void cleanup();
+ /**
+ * Writes a long value as a varint. Non-negative values and -1 are encoded
efficiently with respect to compactness.
+ * Negative values (like -2) take a lot more space.
+ *
+ * @param val Value to write.
+ * @throws IOException If something goes wrong.
+ */
+ void writeVarInt(long val) throws IOException;
+
/**
* Writes array of {@code byte}s.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
index d3df8eea62..0f5d3655c0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
@@ -32,7 +32,6 @@ import java.io.InputStream;
import java.io.UTFDataFormatException;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
@@ -46,6 +45,7 @@ import
org.apache.ignite.internal.tostring.IgniteToStringBuilder;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.util.FastTimestamps;
import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.VarIntUtils;
/**
* Data input based on {@code Unsafe} operations.
@@ -465,10 +465,10 @@ public class IgniteUnsafeDataInput extends InputStream
implements IgniteDataInpu
/** {@inheritDoc} */
@Override
public UUID readUuid() throws IOException {
- int length = readByte();
- byte[] bytes = readByteArray(length);
+ long high = readLong();
+ long low = readLong();
- return UUID.fromString(new String(bytes, StandardCharsets.UTF_8));
+ return new UUID(high, low);
}
/** {@inheritDoc} */
@@ -704,6 +704,11 @@ public class IgniteUnsafeDataInput extends InputStream
implements IgniteDataInpu
}
}
+ @Override
+ public long readVarInt() throws IOException {
+ return VarIntUtils.readVarInt(this);
+ }
+
/** {@inheritDoc} */
@Override
public String readLine() throws IOException {
@@ -740,7 +745,7 @@ public class IgniteUnsafeDataInput extends InputStream
implements IgniteDataInpu
/** {@inheritDoc} */
@Override
public String readUTF() throws IOException {
- return readUtfBody(VarInts.readUnsignedInt(this));
+ return readUtfBody(NaiveVarInts.readUnsignedInt(this));
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
index 7dee8c054d..4996262e93 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
@@ -30,7 +30,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
@@ -44,6 +43,7 @@ import
org.apache.ignite.internal.tostring.IgniteToStringBuilder;
import org.apache.ignite.internal.util.FastTimestamps;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.StringIntrospection;
+import org.apache.ignite.internal.util.VarIntUtils;
/**
* Data output based on {@code Unsafe} operations.
@@ -428,10 +428,8 @@ public class IgniteUnsafeDataOutput extends OutputStream
implements IgniteDataOu
/** {@inheritDoc} */
@Override
public void writeUuid(UUID val) throws IOException {
- byte[] bytes = val.toString().getBytes(StandardCharsets.US_ASCII);
-
- writeByte(bytes.length);
- writeByteArray(bytes);
+ writeLong(val.getMostSignificantBits());
+ writeLong(val.getLeastSignificantBits());
}
/** {@inheritDoc} */
@@ -499,6 +497,11 @@ public class IgniteUnsafeDataOutput extends OutputStream
implements IgniteDataOu
out = null;
}
+ @Override
+ public void writeVarInt(long val) throws IOException {
+ VarIntUtils.writeVarInt(val, this);
+ }
+
/** {@inheritDoc} */
@Override
public void writeByteArray(byte[] arr) throws IOException {
@@ -716,7 +719,7 @@ public class IgniteUnsafeDataOutput extends OutputStream
implements IgniteDataOu
* @throws IOException In case of error.
*/
private void writeUtf(String s, int utfLen) throws IOException {
- VarInts.writeUnsignedInt(utfLen, this);
+ NaiveVarInts.writeUnsignedInt(utfLen, this);
if (utfLen == s.length()) {
writeAsciiStringBytes(s);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/VarInts.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/NaiveVarInts.java
similarity index 84%
rename from
modules/core/src/main/java/org/apache/ignite/internal/util/io/VarInts.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/util/io/NaiveVarInts.java
index c1e9d4c8a9..095fdc298d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/VarInts.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/NaiveVarInts.java
@@ -20,12 +20,18 @@ package org.apache.ignite.internal.util.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.ignite.internal.util.VarIntUtils;
/**
- * Utils to read/write variable length ints.
+ * Utils to read/write naive variable length ints.
+ *
+ * <p>The 'naivety' relates to a simple algorithm. The naive varints produce
more compact results than the 'general purpose' varints
+ * (see {@link VarIntUtils}) when the input integer is approximately between
127 and 254, so they seem to be more appropriate
+ * for small values (like lengths of short strings and collections).
*/
-public class VarInts {
- private VarInts() {
+// TODO: IGNITE-23461 remove this class, use VarIntUtils instead.
+public class NaiveVarInts {
+ private NaiveVarInts() {
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerialization.java
b/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerialization.java
new file mode 100644
index 0000000000..a6c539430b
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerialization.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.versioned;
+
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
+import java.io.IOException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;
+
+/**
+ * The entry point API to work with versioned serialization (used to persist
objects).
+ *
+ * <p>To make it possible to persist an object, a {@link VersionedSerializer}
is to be implemented and used with
+ * {@link #toBytes(Object, VersionedSerializer)} and {@link #fromBytes(byte[],
VersionedSerializer)}.
+ *
+ * <p>It is responsibility of a serializer to support transparent object
structure changes (addition of fields and so on).
+ *
+ * @see VersionedSerializer
+ */
+public class VersionedSerialization {
+ /** Initial capacity (in bytes) of the buffer used for data output. */
+ private static final int INITIAL_BUFFER_CAPACITY = 256;
+
+ /**
+ * Converts an object to bytes (including protocol version) that can be
later converted back to an object
+ * using {@link #fromBytes(byte[], VersionedSerializer)}.
+ *
+ * @param object Object to serialize.
+ * @param serializer Serializer to do the serialization.
+ * @return Bytes representing the object.
+ * @see #fromBytes(byte[], VersionedSerializer)
+ */
+ public static <T> byte[] toBytes(T object, VersionedSerializer<T>
serializer) {
+ try (IgniteUnsafeDataOutput out = new
IgniteUnsafeDataOutput(INITIAL_BUFFER_CAPACITY)) {
+ serializer.writeExternal(object, out);
+
+ return out.array();
+ } catch (IOException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, "Cannot
serialize", e);
+ }
+ }
+
+ /**
+ * Deserializes an object serialized with {@link #toBytes(Object,
VersionedSerializer)}.
+ *
+ * @param bytes Bytes to deserialize from.
+ * @param serializer Serializer to use.
+ * @return Deserialized object.
+ */
+ public static <T> T fromBytes(byte[] bytes, VersionedSerializer<T>
serializer) {
+ IgniteUnsafeDataInput in = new IgniteUnsafeDataInput(bytes);
+
+ try {
+ T result = serializer.readExternal(in);
+
+ if (in.available() != 0) {
+ throw new IOException(in.available() + " bytes left unread
after deserializing " + result);
+ }
+
+ return result;
+ } catch (IOException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, "Cannot
deserialize", e);
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
new file mode 100644
index 0000000000..a2d78c5d5d
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.versioned;
+
+import java.io.IOException;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+
+/**
+ * Serializes and deserializes objects in a versioned way: that is, includes
version to make it possible to deserialize objects serialized
+ * (and persisted) in earlier versions later when the corresponding class'es
structure has changed.
+ *
+ * <p>It is supposed to be used for the cases when some object needs to be
persisted to be stored for some time (maybe, a long time).
+ *
+ * <p>If a format is changed (for example, a new field is added), the version
returned by {@link #getProtocolVersion()} has
+ * to be incremented.
+ */
+public abstract class VersionedSerializer<T> {
+ /** Magic number to detect correct serialized objects. */
+ private static final int MAGIC = 0x43BEEF00;
+
+ /**
+ * Returns protocol version.
+ */
+ protected byte getProtocolVersion() {
+ return 1;
+ }
+
+ /**
+ * Save object's specific (that is, ignoring the signature and version)
data content.
+ *
+ * @param object object to write.
+ * @param out Output to write data content.
+ * @throws IOException If an I/O error occurs.
+ */
+ protected abstract void writeExternalData(T object, IgniteDataOutput out)
throws IOException;
+
+ /**
+ * Writes an object to an output, including a signature and version.
+ *
+ * @param object Object to write.
+ * @param out Output to which to write.
+ * @throws IOException If an I/O error occurs.
+ */
+ public final void writeExternal(T object, IgniteDataOutput out) throws
IOException {
+ int hdr = MAGIC + Byte.toUnsignedInt(getProtocolVersion());
+
+ out.writeInt(hdr);
+
+ writeExternalData(object, out);
+ }
+
+ /**
+ * Load object's specific data content.
+ *
+ * @param protoVer Input object version.
+ * @param in Input to load data content.
+ * @throws IOException If an I/O error occurs.
+ */
+ protected abstract T readExternalData(byte protoVer, IgniteDataInput in)
throws IOException;
+
+ /**
+ * Reads an object which was earlier saved with {@link
#writeExternal(Object, IgniteDataOutput)}.
+ *
+ * <p>Signature is checked when reading.
+ *
+ * @param in Input from which to read.
+ * @throws IOException If an I/O error occurs.
+ * @see #writeExternal(Object, IgniteDataOutput)
+ */
+ public final T readExternal(IgniteDataInput in) throws IOException {
+ int hdr = in.readInt();
+
+ if ((hdr & MAGIC) != MAGIC) {
+ throw new IOException("Unexpected serialized object header " +
"[actual=" + Integer.toHexString(hdr)
+ + ", expected=" + Integer.toHexString(MAGIC) + "]");
+ }
+
+ byte ver = (byte) (hdr & 0xFF);
+
+ return readExternalData(ver, in);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/VarIntUtilsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/VarIntUtilsTest.java
index 6f1ebc1b75..374e61d000 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/VarIntUtilsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/VarIntUtilsTest.java
@@ -22,6 +22,8 @@ import static org.hamcrest.Matchers.is;
import java.nio.ByteBuffer;
import java.util.stream.Stream;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -29,34 +31,56 @@ import org.junit.jupiter.params.provider.MethodSource;
class VarIntUtilsTest {
@ParameterizedTest
- @MethodSource("sampleInts")
- void readingAndWritingVarintsIsCompatible(int intVal) {
+ @MethodSource("sampleLongs")
+ void readingAndWritingVarintsInMemoryIsCompatible(long longVal) {
byte[] array = new byte[10];
- VarIntUtils.putVarIntToBytes(intVal, array, 1);
+ VarIntUtils.putVarIntToBytes(longVal, array, 1);
ByteBuffer buf = ByteBuffer.wrap(array);
buf.position(1);
- assertThat(VarIntUtils.readVarInt(buf), is(intVal));
+ assertThat(VarIntUtils.readVarInt(buf), is(longVal));
}
@ParameterizedTest
- @MethodSource("sampleInts")
- void writingVarIntReturnsNumberOfBytesInItsRepresentation(int intVal) {
+ @MethodSource("sampleLongs")
+ void readingAndWritingVarintsInIoIsCompatible(long longVal) throws
Exception {
+ IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(100);
+
+ VarIntUtils.writeVarInt(longVal, out);
+
+ IgniteUnsafeDataInput in = new IgniteUnsafeDataInput(out.array());
+
+ assertThat(VarIntUtils.readVarInt(in), is(longVal));
+ }
+
+ @ParameterizedTest
+ @MethodSource("sampleLongs")
+ void writingVarIntInMemoryReturnsNumberOfBytesInItsRepresentation(long
longVal) {
byte[] array = new byte[10];
- int len = VarIntUtils.putVarIntToBytes(intVal, array, 0);
+ int len = VarIntUtils.putVarIntToBytes(longVal, array, 0);
- assertThat(VarIntUtils.varIntLength(intVal), is(len));
+ assertThat(VarIntUtils.varIntLength(longVal), is(len));
}
@ParameterizedTest
- @MethodSource("sampleInts")
- void readingVarIntConsumesExactlyItsBytes(int intVal) {
+ @MethodSource("sampleLongs")
+ void writingVarIntInIoReturnsNumberOfBytesInItsRepresentation(long
longVal) throws Exception {
+ IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(100);
+
+ int len = VarIntUtils.writeVarInt(longVal, out);
+
+ assertThat(VarIntUtils.varIntLength(longVal), is(len));
+ }
+
+ @ParameterizedTest
+ @MethodSource("sampleLongs")
+ void readingVarIntFromMemoryConsumesExactlyItsBytes(long longVal) {
byte[] array = new byte[10];
- int len = VarIntUtils.putVarIntToBytes(intVal, array, 0);
+ int len = VarIntUtils.putVarIntToBytes(longVal, array, 0);
ByteBuffer buf = ByteBuffer.wrap(array);
@@ -65,16 +89,30 @@ class VarIntUtilsTest {
assertThat(buf.position(), is(len));
}
- private static Stream<Arguments> sampleInts() {
+ @ParameterizedTest
+ @MethodSource("sampleLongs")
+ void readingVarIntFromIoConsumesExactlyItsBytes(long longVal) throws
Exception {
+ byte[] array = new byte[10];
+
+ int len = VarIntUtils.putVarIntToBytes(longVal, array, 0);
+
+ IgniteUnsafeDataInput in = new IgniteUnsafeDataInput(array);
+
+ VarIntUtils.readVarInt(in);
+
+ assertThat(in.available(), is(array.length - len));
+ }
+
+ private static Stream<Arguments> sampleLongs() {
return Stream.of(
- -1, 0, 1,
- 128 - 2, 128 - 1, 255, 256,
- 128 * 128 - 2, 128 * 128 - 1,
- 65535, 65536,
- 128 * 128 * 128 - 2, 128 * 128 * 128 - 1,
- 16777215, 16777216,
- 128 * 128 * 128 * 128 - 2, 128 * 128 * 128 * 128 - 1,
- 2147483647
+ -1L, 0L, 1L,
+ 128L - 2, 128L - 1, 255L, 256L,
+ 128L * 128 - 2, 128L * 128 - 1,
+ 65535L, 65536L,
+ 128L * 128 * 128 - 2, 128L * 128 * 128 - 1,
+ 16777215L, 16777216L,
+ 128L * 128 * 128 * 128 - 2, 128L * 128 * 128 * 128 - 1,
+ 2147483647L
).map(Arguments::of);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/io/VarIntsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/io/NaiveVarIntsTest.java
similarity index 91%
rename from
modules/core/src/test/java/org/apache/ignite/internal/util/io/VarIntsTest.java
rename to
modules/core/src/test/java/org/apache/ignite/internal/util/io/NaiveVarIntsTest.java
index c801e182c0..7ce70a22ad 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/io/VarIntsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/io/NaiveVarIntsTest.java
@@ -33,11 +33,11 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-class VarIntsTest {
+class NaiveVarIntsTest {
@ParameterizedTest
@MethodSource("intRangeBorders")
void writesIntsWithCorrectLengths(IntWriteSpec spec) throws Exception {
- byte[] bytes = writeToBytes(output ->
VarInts.writeUnsignedInt(spec.value, output));
+ byte[] bytes = writeToBytes(output ->
NaiveVarInts.writeUnsignedInt(spec.value, output));
assertThat(bytes.length, is(spec.expectedLength));
}
@@ -73,8 +73,8 @@ class VarIntsTest {
@ParameterizedTest
@MethodSource("intRangeBorders")
void writesAndReadsInts(IntWriteSpec spec) throws Exception {
- byte[] bytes = writeToBytes(output ->
VarInts.writeUnsignedInt(spec.value, output));
- int result = readIntFromBytesConsuming(bytes,
VarInts::readUnsignedInt);
+ byte[] bytes = writeToBytes(output ->
NaiveVarInts.writeUnsignedInt(spec.value, output));
+ int result = readIntFromBytesConsuming(bytes,
NaiveVarInts::readUnsignedInt);
assertThat(result, is(spec.value));
}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index 5a55828be6..f6b6b169ab 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -31,10 +31,12 @@ import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshotSerializer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext;
import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.Test;
@@ -177,7 +179,13 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest extends BaseDistri
var clusterNodes2 = Set.of(NODE_1, NODE_2);
- clusterStateStorage.put(LOGICAL_TOPOLOGY_KEY, ByteUtils.toBytes(new
LogicalTopologySnapshot(10L, clusterNodes2, clusterId)));
+ clusterStateStorage.put(
+ LOGICAL_TOPOLOGY_KEY,
+ VersionedSerialization.toBytes(
+ new LogicalTopologySnapshot(10L, clusterNodes2,
clusterId),
+ LogicalTopologySnapshotSerializer.INSTANCE
+ )
+ );
topology.fireTopologyLeap();
@@ -198,7 +206,13 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest extends BaseDistri
var clusterNodes2 = Set.of(NODE_1, NODE_2);
- clusterStateStorage.put(LOGICAL_TOPOLOGY_KEY, ByteUtils.toBytes(new
LogicalTopologySnapshot(10L, clusterNodes2, clusterId)));
+ clusterStateStorage.put(
+ LOGICAL_TOPOLOGY_KEY,
+ VersionedSerialization.toBytes(
+ new LogicalTopologySnapshot(10L, clusterNodes2,
clusterId),
+ LogicalTopologySnapshotSerializer.INSTANCE
+ )
+ );
keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(),
ByteUtils.longToBytesKeepingOrder(11L), KV_UPDATE_CONTEXT);
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
index 155189179c..b36870ca9f 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
@@ -69,6 +69,7 @@ import
org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
import org.apache.ignite.internal.catalog.events.DropZoneEventParameters;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshotSerializer;
import
org.apache.ignite.internal.distributionzones.BaseDistributionZoneManagerTest;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZonesUtil;
@@ -85,7 +86,7 @@ import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.server.If;
import
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler;
import org.apache.ignite.internal.network.ClusterNodeImpl;
-import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
@@ -1284,7 +1285,13 @@ public class DistributionZoneCausalityDataNodesTest
extends BaseDistributionZone
long topVer = topology.getLogicalTopology().version() + 1;
- clusterStateStorage.put(LOGICAL_TOPOLOGY_KEY, ByteUtils.toBytes(new
LogicalTopologySnapshot(topVer, nodes)));
+ clusterStateStorage.put(
+ LOGICAL_TOPOLOGY_KEY,
+ VersionedSerialization.toBytes(
+ new LogicalTopologySnapshot(topVer, nodes),
+ LogicalTopologySnapshotSerializer.INSTANCE
+ )
+ );
topology.fireTopologyLeap();
diff --git
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/ClusterNodeSerializer.java
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/ClusterNodeSerializer.java
new file mode 100644
index 0000000000..dc6c2f551c
--- /dev/null
+++
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/ClusterNodeSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeMetadata;
+
+/**
+ * {@link VersionedSerializer} for {@link ClusterNode} instances.
+ */
+public class ClusterNodeSerializer extends VersionedSerializer<ClusterNode> {
+ /** Serializer instance. */
+ public static final ClusterNodeSerializer INSTANCE = new
ClusterNodeSerializer();
+
+ @Override
+ protected void writeExternalData(ClusterNode node, IgniteDataOutput out)
throws IOException {
+ out.writeUuid(node.id());
+ out.writeUTF(node.name());
+
+ out.writeUTF(node.address().host());
+ out.writeVarInt(node.address().port());
+
+ NodeMetadata metadata = node.nodeMetadata();
+ out.writeBoolean(metadata != null);
+ if (metadata != null) {
+ out.writeUTF(metadata.restHost());
+ out.writeVarInt(metadata.httpPort());
+ out.writeVarInt(metadata.httpsPort());
+ }
+ }
+
+ @Override
+ protected ClusterNode readExternalData(byte protoVer, IgniteDataInput in)
throws IOException {
+ UUID id = in.readUuid();
+ String name = in.readUTF();
+ NetworkAddress address = new NetworkAddress(in.readUTF(),
in.readVarIntAsInt());
+
+ boolean hasMetadata = in.readBoolean();
+ NodeMetadata metadata;
+ if (hasMetadata) {
+ metadata = new NodeMetadata(in.readUTF(), in.readVarIntAsInt(),
in.readVarIntAsInt());
+ } else {
+ metadata = null;
+ }
+
+ return new ClusterNodeImpl(id, name, address, metadata);
+ }
+}
diff --git
a/modules/network-api/src/test/java/org/apache/ignite/internal/network/ClusterNodeSerializerTest.java
b/modules/network-api/src/test/java/org/apache/ignite/internal/network/ClusterNodeSerializerTest.java
new file mode 100644
index 0000000000..4dcfc3eeae
--- /dev/null
+++
b/modules/network-api/src/test/java/org/apache/ignite/internal/network/ClusterNodeSerializerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.util.Base64;
+import java.util.UUID;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeMetadata;
+import org.junit.jupiter.api.Test;
+
+class ClusterNodeSerializerTest {
+ private final ClusterNodeSerializer serializer = new
ClusterNodeSerializer();
+
+ @Test
+ void serializationAndDeserialization() {
+ ClusterNode originalNode = new ClusterNodeImpl(
+ UUID.randomUUID(),
+ "test",
+ new NetworkAddress("host", 3000),
+ new NodeMetadata("ext-host", 3001, 3002)
+ );
+
+ byte[] bytes = VersionedSerialization.toBytes(originalNode,
serializer);
+ ClusterNode restoredNode = VersionedSerialization.fromBytes(bytes,
serializer);
+
+ assertThat(restoredNode.id(), equalTo(originalNode.id()));
+ assertThat(restoredNode.name(), equalTo("test"));
+ assertThat(restoredNode.address(), equalTo(new NetworkAddress("host",
3000)));
+ assertThat(restoredNode.nodeMetadata(), equalTo(new
NodeMetadata("ext-host", 3001, 3002)));
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode("Ae++Q5FHweYb0PNweo7gWHuyYaUEdGVzdARob3N0uRcBCGV4dC1ob3N0uhe7Fw==");
+
+ ClusterNode restoredNode = VersionedSerialization.fromBytes(bytes,
serializer);
+
+ assertThat(restoredNode.id(),
equalTo(UUID.fromString("70f3d01b-e6c1-4791-a561-b27b58e08e7a")));
+ assertThat(restoredNode.name(), equalTo("test"));
+ assertThat(restoredNode.address(), equalTo(new NetworkAddress("host",
3000)));
+ assertThat(restoredNode.nodeMetadata(), equalTo(new
NodeMetadata("ext-host", 3001, 3002)));
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
index d991709b3d..0f775a6d9d 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
@@ -21,7 +21,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.BitSet;
-import org.apache.ignite.internal.util.io.VarInts;
+import org.apache.ignite.internal.util.io.NaiveVarInts;
/**
* Protocol-wide elements marshalling.
@@ -31,28 +31,28 @@ class ProtocolMarshalling {
static final int MAX_LENGTH_BYTE_COUNT = 4;
static void writeDescriptorOrCommandId(int id, DataOutput output) throws
IOException {
- VarInts.writeUnsignedInt(id, output);
+ NaiveVarInts.writeUnsignedInt(id, output);
}
static int readDescriptorOrCommandId(DataInput input) throws IOException {
- return VarInts.readUnsignedInt(input);
+ return NaiveVarInts.readUnsignedInt(input);
}
static void writeObjectId(int id, DataOutput output) throws IOException {
- VarInts.writeUnsignedInt(id, output);
+ NaiveVarInts.writeUnsignedInt(id, output);
}
static int readObjectId(DataInput input) throws IOException {
- return VarInts.readUnsignedInt(input);
+ return NaiveVarInts.readUnsignedInt(input);
}
static void writeLength(int length, DataOutput output) throws IOException {
- VarInts.writeUnsignedInt(length, output);
+ NaiveVarInts.writeUnsignedInt(length, output);
}
static int readLength(DataInput input) throws IOException {
- return VarInts.readUnsignedInt(input);
+ return NaiveVarInts.readUnsignedInt(input);
}
static void writeFixedLengthBitSet(BitSet bitset, int bitSetLength,
DataOutput output) throws IOException {
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java
index 2d48f9ae17..2a9a5042e6 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java
@@ -64,6 +64,6 @@ public class PartitionCommandsMarshallerImpl extends
OptimizedMarshaller impleme
*/
@Override
public int readRequiredCatalogVersion(ByteBuffer raw) {
- return VarIntUtils.readVarInt(raw);
+ return (int) VarIntUtils.readVarInt(raw);
}
}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
index 01dd9d681e..066e8eb360 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
@@ -22,12 +22,14 @@ import static
org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
import java.util.UUID;
import org.apache.ignite.internal.cluster.management.ClusterState;
+import
org.apache.ignite.internal.cluster.management.ClusterStatePersistentSerializer;
import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
import org.apache.ignite.internal.disaster.system.storage.ClusterResetStorage;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.jetbrains.annotations.Nullable;
/**
@@ -72,7 +74,8 @@ public class SystemDisasterRecoveryStorage implements
ClusterResetStorage {
* the cluster yet).
*/
public @Nullable ClusterState readClusterState() {
- return readFromVault(CLUSTER_STATE_VAULT_KEY);
+ VaultEntry entry = vault.get(CLUSTER_STATE_VAULT_KEY);
+ return entry != null ? VersionedSerialization.fromBytes(entry.value(),
ClusterStatePersistentSerializer.INSTANCE) : null;
}
private <T> @Nullable T readFromVault(ByteArray key) {
@@ -81,7 +84,7 @@ public class SystemDisasterRecoveryStorage implements
ClusterResetStorage {
}
void saveClusterState(ClusterState clusterState) {
- vault.put(CLUSTER_STATE_VAULT_KEY, ByteUtils.toBytes(clusterState));
+ vault.put(CLUSTER_STATE_VAULT_KEY,
VersionedSerialization.toBytes(clusterState,
ClusterStatePersistentSerializer.INSTANCE));
}
boolean isInitConfigApplied() {
diff --git
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
index 98719f97b5..d3031a3706 100644
---
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
+++
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -28,7 +28,6 @@ import static
org.apache.ignite.internal.testframework.asserts.CompletableFuture
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -63,6 +62,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.ClusterState;
+import
org.apache.ignite.internal.cluster.management.ClusterStatePersistentSerializer;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
import
org.apache.ignite.internal.disaster.system.exception.ClusterResetException;
@@ -91,6 +91,7 @@ import
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
@@ -214,7 +215,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
VaultEntry entry = vaultManager.get(CLUSTER_STATE_VAULT_KEY);
assertThat(entry, is(notNullValue()));
- ClusterState savedState = fromBytes(entry.value());
+ ClusterState savedState =
VersionedSerialization.fromBytes(entry.value(),
ClusterStatePersistentSerializer.INSTANCE);
assertThat(savedState, is(equalTo(usualClusterState)));
}
@@ -285,7 +286,10 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
}
private void putClusterState() {
- vaultManager.put(CLUSTER_STATE_VAULT_KEY, toBytes(usualClusterState));
+ vaultManager.put(
+ CLUSTER_STATE_VAULT_KEY,
+ VersionedSerialization.toBytes(usualClusterState,
ClusterStatePersistentSerializer.INSTANCE)
+ );
}
private void markInitConfigApplied() {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
index c42cee416c..2a42165081 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
@@ -64,6 +64,6 @@ public class PartitionCommandsMarshallerImpl extends
OptimizedMarshaller impleme
*/
@Override
public int readRequiredCatalogVersion(ByteBuffer raw) {
- return VarIntUtils.readVarInt(raw);
+ return (int) VarIntUtils.readVarInt(raw);
}
}