This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 71a95cb5bd IGNITE-23436 Do not use ByteUtils#toBytes to persist 
objects related to CMG (#4562)
71a95cb5bd is described below

commit 71a95cb5bd73f759e1480bfc3816d1fce2cfbebe
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 16 16:36:00 2024 +0400

    IGNITE-23436 Do not use ByteUtils#toBytes to persist objects related to CMG 
(#4562)
---
 .../org/apache/ignite/network/ClusterNode.java     |   3 +-
 .../org/apache/ignite/network/NetworkAddress.java  |   3 +-
 modules/cluster-management/build.gradle            |   3 +-
 .../ClusterStatePersistentSerializer.java          | 121 +++++++++++++++++++++
 .../cluster/management/LocalStateStorage.java      |  51 +++++++--
 .../raft/ClusterStateStorageManager.java           |  13 ++-
 .../management/raft/CmgRaftGroupListener.java      |  42 +++----
 .../management/topology/LogicalTopologyImpl.java   |   9 +-
 .../management/topology/api/LogicalNode.java       |  42 ++++---
 .../topology/api/LogicalNodeSerializer.java        |  96 ++++++++++++++++
 .../topology/api/LogicalTopologySnapshot.java      |   5 +-
 .../api/LogicalTopologySnapshotSerializer.java     |  64 +++++++++++
 .../ClusterStatePersistentSerializerTest.java      |  89 +++++++++++++++
 .../cluster/management/LocalStateStorageTest.java  |  81 ++++++++++++++
 .../topology/api/LogicalNodeSerializerTest.java    |  75 +++++++++++++
 .../api/LogicalTopologySnapshotSerializerTest.java |  97 +++++++++++++++++
 .../apache/ignite/internal/util/VarIntUtils.java   |  68 ++++++++++--
 .../ignite/internal/util/io/IgniteDataInput.java   |  22 ++++
 .../ignite/internal/util/io/IgniteDataOutput.java  |   9 ++
 .../internal/util/io/IgniteUnsafeDataInput.java    |  15 ++-
 .../internal/util/io/IgniteUnsafeDataOutput.java   |  15 ++-
 .../util/io/{VarInts.java => NaiveVarInts.java}    |  12 +-
 .../internal/versioned/VersionedSerialization.java |  82 ++++++++++++++
 .../internal/versioned/VersionedSerializer.java    |  98 +++++++++++++++++
 .../ignite/internal/util/VarIntUtilsTest.java      |  78 +++++++++----
 .../io/{VarIntsTest.java => NaiveVarIntsTest.java} |   8 +-
 ...butionZoneManagerLogicalTopologyEventsTest.java |  18 ++-
 .../DistributionZoneCausalityDataNodesTest.java    |  11 +-
 .../internal/network/ClusterNodeSerializer.java    |  69 ++++++++++++
 .../network/ClusterNodeSerializerTest.java         |  63 +++++++++++
 .../serialization/marshal/ProtocolMarshalling.java |  14 +--
 .../PartitionCommandsMarshallerImpl.java           |   2 +-
 .../system/SystemDisasterRecoveryStorage.java      |   7 +-
 .../SystemDisasterRecoveryManagerImplTest.java     |  10 +-
 .../schema/PartitionCommandsMarshallerImpl.java    |   2 +-
 35 files changed, 1271 insertions(+), 126 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java 
b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
index b3a8cadaa4..674f9adf83 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -17,14 +17,13 @@
 
 package org.apache.ignite.network;
 
-import java.io.Serializable;
 import java.util.UUID;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Representation of a node in a cluster.
  */
-public interface ClusterNode extends Serializable {
+public interface ClusterNode {
     /**
      * Returns the node's local ID.
      *
diff --git 
a/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java 
b/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
index b909fd3080..9c8163fa91 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.network;
 
-import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -25,7 +24,7 @@ import java.util.regex.Pattern;
 /**
  * Representation of a network address that includes a host name and a port.
  */
-public class NetworkAddress implements Serializable {
+public class NetworkAddress {
     /** Regexp for parsing strings in the "host:port" format. */
     private static final Pattern ADDRESS_PATTERN = 
Pattern.compile("(.+):(\\d+)");
 
diff --git a/modules/cluster-management/build.gradle 
b/modules/cluster-management/build.gradle
index b702a46b15..0aa2d77855 100644
--- a/modules/cluster-management/build.gradle
+++ b/modules/cluster-management/build.gradle
@@ -49,10 +49,11 @@ dependencies {
     testImplementation project(':ignite-core')
     testImplementation project(':ignite-configuration')
     testImplementation project(':ignite-network')
+    testImplementation project(':ignite-system-disaster-recovery')
     testImplementation testFixtures(project(':ignite-core'))
     testImplementation testFixtures(project(':ignite-configuration'))
     testImplementation testFixtures(project(':ignite-network'))
-    testImplementation project(':ignite-system-disaster-recovery')
+    testImplementation testFixtures(project(':ignite-vault'))
 
     testImplementation libs.hamcrest.core
     testImplementation libs.mockito.junit
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
new file mode 100644
index 0000000000..7bd3240751
--- /dev/null
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link VersionedSerializer} for {@link ClusterState} instance.
+ */
+public class ClusterStatePersistentSerializer extends 
VersionedSerializer<ClusterState> {
+    private static final CmgMessagesFactory CMG_MSGS_FACTORY = new 
CmgMessagesFactory();
+
+    /** Serializer instance. */
+    public static final ClusterStatePersistentSerializer INSTANCE = new 
ClusterStatePersistentSerializer();
+
+    @Override
+    protected void writeExternalData(ClusterState state, IgniteDataOutput out) 
throws IOException {
+        writeStringSet(state.cmgNodes(), out);
+        writeStringSet(state.metaStorageNodes(), out);
+        out.writeUTF(state.version());
+        out.writeUTF(state.clusterTag().clusterName());
+        out.writeUuid(state.clusterTag().clusterId());
+        writeNullableString(state.initialClusterConfiguration(), out);
+
+        List<UUID> formerClusterIds = state.formerClusterIds();
+        out.writeVarInt(formerClusterIds == null ? -1 : 
formerClusterIds.size());
+        if (formerClusterIds != null) {
+            for (UUID clusterId : formerClusterIds) {
+                out.writeUuid(clusterId);
+            }
+        }
+    }
+
+    private static void writeStringSet(Set<String> strings, IgniteDataOutput 
out) throws IOException {
+        out.writeVarInt(strings.size());
+        for (String str : strings) {
+            out.writeUTF(str);
+        }
+    }
+
+    private static void writeNullableString(@Nullable String str, 
IgniteDataOutput out) throws IOException {
+        out.writeVarInt(str == null ? -1 : str.length());
+        if (str != null) {
+            out.writeByteArray(str.getBytes(UTF_8));
+        }
+    }
+
+    @Override
+    protected ClusterState readExternalData(byte protoVer, IgniteDataInput in) 
throws IOException {
+        return CMG_MSGS_FACTORY.clusterState()
+                .cmgNodes(readStringSet(in))
+                .metaStorageNodes(readStringSet(in))
+                .version(in.readUTF())
+                .clusterTag(ClusterTag.clusterTag(CMG_MSGS_FACTORY, 
in.readUTF(), in.readUuid()))
+                .initialClusterConfiguration(readNullableString(in))
+                .formerClusterIds(readFormerClusterIds(in))
+                .build();
+    }
+
+    private static Set<String> readStringSet(IgniteDataInput in) throws 
IOException {
+        int size = in.readVarIntAsInt();
+
+        Set<String> result = new HashSet<>(size);
+        for (int i = 0; i < size; i++) {
+            result.add(in.readUTF());
+        }
+
+        return result;
+    }
+
+    private static @Nullable String readNullableString(IgniteDataInput in) 
throws IOException {
+        int lengthOrMinusOne = in.readVarIntAsInt();
+        if (lengthOrMinusOne == -1) {
+            return null;
+        }
+
+        return new String(in.readByteArray(lengthOrMinusOne), UTF_8);
+    }
+
+    private static @Nullable List<UUID> readFormerClusterIds(IgniteDataInput 
in) throws IOException {
+        int length = in.readVarIntAsInt();
+
+        if (length == -1) {
+            return null;
+        }
+
+        List<UUID> result = new ArrayList<>(length);
+        for (int i = 0; i < length; i++) {
+            result.add(in.readUuid());
+        }
+
+        return result;
+    }
+}
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
index d708e5e038..a23d88f202 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/LocalStateStorage.java
@@ -17,12 +17,17 @@
 
 package org.apache.ignite.internal.cluster.management;
 
-import java.io.Serializable;
+import java.io.IOException;
+import java.util.HashSet;
 import java.util.Set;
+import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
 import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
 import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -33,9 +38,7 @@ import org.jetbrains.annotations.Nullable;
 class LocalStateStorage {
     private static final ByteArray CMG_STATE_VAULT_KEY = 
ByteArray.fromString("cmg_state");
 
-    static class LocalState implements Serializable {
-        private static final long serialVersionUID = -5069326157367860480L;
-
+    static class LocalState {
         private final Set<String> cmgNodeNames;
 
         private final ClusterTag clusterTag;
@@ -68,7 +71,11 @@ class LocalStateStorage {
     @Nullable LocalState getLocalState() {
         VaultEntry entry = vault.get(CMG_STATE_VAULT_KEY);
 
-        return entry == null ? null : ByteUtils.fromBytes(entry.value());
+        if (entry == null) {
+            return null;
+        }
+
+        return VersionedSerialization.fromBytes(entry.value(), 
LocalStateSerializer.INSTANCE);
     }
 
     /**
@@ -77,7 +84,7 @@ class LocalStateStorage {
      * @param state Local state to save.
      */
     void saveLocalState(LocalState state) {
-        vault.put(CMG_STATE_VAULT_KEY, ByteUtils.toBytes(state));
+        vault.put(CMG_STATE_VAULT_KEY, VersionedSerialization.toBytes(state, 
LocalStateSerializer.INSTANCE));
     }
 
     /**
@@ -86,4 +93,34 @@ class LocalStateStorage {
     void clear() {
         vault.remove(CMG_STATE_VAULT_KEY);
     }
+
+    private static class LocalStateSerializer extends 
VersionedSerializer<LocalState> {
+        private static final CmgMessagesFactory CMG_MESSAGES_FACTORY = new 
CmgMessagesFactory();
+
+        private static final LocalStateSerializer INSTANCE = new 
LocalStateSerializer();
+
+        @Override
+        protected void writeExternalData(LocalState state, IgniteDataOutput 
out) throws IOException {
+            out.writeVarInt(state.cmgNodeNames().size());
+            for (String cmgNodeName : state.cmgNodeNames()) {
+                out.writeUTF(cmgNodeName);
+            }
+
+            out.writeUTF(state.clusterTag().clusterName());
+            out.writeUuid(state.clusterTag().clusterId());
+        }
+
+        @Override
+        protected LocalState readExternalData(byte protoVer, IgniteDataInput 
in) throws IOException {
+            int cmgNodesCount = in.readVarIntAsInt();
+            Set<String> cmgNodeNames = new HashSet<>(cmgNodesCount);
+            for (int i = 0; i < cmgNodesCount; i++) {
+                cmgNodeNames.add(in.readUTF());
+            }
+
+            ClusterTag clusterTag = 
ClusterTag.clusterTag(CMG_MESSAGES_FACTORY, in.readUTF(), in.readUuid());
+
+            return new LocalState(cmgNodeNames, clusterTag);
+        }
+    }
 }
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
index 44fe6a989f..d4365a105b 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
@@ -20,9 +20,7 @@ package org.apache.ignite.internal.cluster.management.raft;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToUuid;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
 
 import java.nio.ByteBuffer;
@@ -31,7 +29,10 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.cluster.management.ClusterState;
+import 
org.apache.ignite.internal.cluster.management.ClusterStatePersistentSerializer;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalNodeSerializer;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -62,7 +63,7 @@ public class ClusterStateStorageManager {
     public ClusterState getClusterState() {
         byte[] value = storage.get(CMG_STATE_KEY);
 
-        return value == null ? null : fromBytes(value);
+        return value == null ? null : VersionedSerialization.fromBytes(value, 
ClusterStatePersistentSerializer.INSTANCE);
     }
 
     /**
@@ -71,7 +72,7 @@ public class ClusterStateStorageManager {
      * @param state Cluster state.
      */
     public void putClusterState(ClusterState state) {
-        storage.put(CMG_STATE_KEY, toBytes(state));
+        storage.put(CMG_STATE_KEY, VersionedSerialization.toBytes(state, 
ClusterStatePersistentSerializer.INSTANCE));
     }
 
     /**
@@ -87,7 +88,7 @@ public class ClusterStateStorageManager {
      * Marks the given node as validated.
      */
     void putValidatedNode(LogicalNode node) {
-        storage.put(validatedNodeKey(node.id()), toBytes(node));
+        storage.put(validatedNodeKey(node.id()), 
VersionedSerialization.toBytes(node, LogicalNodeSerializer.INSTANCE));
     }
 
     /**
@@ -110,7 +111,7 @@ public class ClusterStateStorageManager {
      * Returns a collection of nodes that passed the validation but have not 
yet joined the logical topology.
      */
     List<LogicalNode> getValidatedNodes() {
-        return storage.getWithPrefix(VALIDATED_NODE_PREFIX, (k, v) -> 
fromBytes(v));
+        return storage.getWithPrefix(VALIDATED_NODE_PREFIX, (k, v) -> 
VersionedSerialization.fromBytes(v, LogicalNodeSerializer.INSTANCE));
     }
 
     /**
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index 0d724fdd92..297ad77350 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.cluster.management.raft;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Objects.requireNonNullElse;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.util.IgniteUtils.capacity;
 
 import java.io.Serializable;
 import java.nio.file.Path;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -237,17 +239,15 @@ public class CmgRaftGroupListener implements 
RaftGroupListener {
     }
 
     private ValidationResult validateNode(JoinRequestCommand command) {
-        ClusterNode node = command.node().asClusterNode();
-
         Optional<LogicalNode> previousVersion = 
logicalTopology.getLogicalTopology().nodes()
                 .stream()
-                .filter(n -> n.name().equals(node.name()))
+                .filter(n -> n.name().equals(command.node().name()))
                 .findAny();
 
         if (previousVersion.isPresent()) {
             LogicalNode previousNode = previousVersion.get();
 
-            if (previousNode.id().equals(node.id())) {
+            if (previousNode.id().equals(command.node().id())) {
                 return ValidationResult.successfulResult();
             } else {
                 // Remove the previous node from the Logical Topology in case 
we haven't received the disappeared event yet.
@@ -255,25 +255,15 @@ public class CmgRaftGroupListener implements 
RaftGroupListener {
             }
         }
 
-        LogicalNode logicalNode = new LogicalNode(
-                node,
-                command.node().userAttributes(),
-                command.node().systemAttributes(),
-                command.node().storageProfiles()
-        );
+        LogicalNode logicalNode = 
logicalNodeFromClusterNodeMessage(command.node());
 
         return 
validationManager.validateNode(storageManager.getClusterState(), logicalNode, 
command.igniteVersion(), command.clusterTag());
     }
 
     private ValidationResult completeValidation(JoinReadyCommand command) {
-        ClusterNode node = command.node().asClusterNode();
+        ClusterNodeMessage clusterNodeMessage = command.node();
 
-        LogicalNode logicalNode = new LogicalNode(
-                node,
-                command.node().userAttributes(),
-                command.node().systemAttributes(),
-                command.node().storageProfiles()
-        );
+        LogicalNode logicalNode = 
logicalNodeFromClusterNodeMessage(clusterNodeMessage);
 
         if (validationManager.isNodeValidated(logicalNode)) {
             ValidationResult validationResponse = 
validationManager.completeValidation(logicalNode);
@@ -284,16 +274,28 @@ public class CmgRaftGroupListener implements 
RaftGroupListener {
 
             return validationResponse;
         } else {
-            return ValidationResult.errorResult(String.format("Node \"%s\" has 
not yet passed the validation step", node));
+            return ValidationResult.errorResult(String.format("Node \"%s\" has 
not yet passed the validation step",
+                    clusterNodeMessage.asClusterNode()));
         }
     }
 
+    private static LogicalNode 
logicalNodeFromClusterNodeMessage(ClusterNodeMessage message) {
+        ClusterNode node = message.asClusterNode();
+
+        return new LogicalNode(
+                node,
+                requireNonNullElse(message.userAttributes(), emptyMap()),
+                requireNonNullElse(message.systemAttributes(), emptyMap()),
+                requireNonNullElse(message.storageProfiles(), emptyList())
+        );
+    }
+
     private void removeNodesFromLogicalTopology(NodesLeaveCommand command) {
         Set<ClusterNode> nodes = 
command.nodes().stream().map(ClusterNodeMessage::asClusterNode).collect(Collectors.toSet());
 
         // Nodes will be removed from a topology, so it is safe to set 
nodeAttributes to the default value
         Set<LogicalNode> logicalNodes = nodes.stream()
-                .map(n -> new LogicalNode(n, Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyList()))
+                .map(n -> new LogicalNode(n, emptyMap(), emptyMap(), 
emptyList()))
                 .collect(Collectors.toSet());
 
         logicalTopology.removeNodes(logicalNodes);
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
index 4c0a1495cd..4f634e4452 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
@@ -22,8 +22,6 @@ import static java.util.Comparator.comparing;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -38,8 +36,10 @@ import 
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageMan
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshotSerializer;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -74,7 +74,8 @@ public class LogicalTopologyImpl implements LogicalTopology {
     private LogicalTopologySnapshot readLogicalTopology() {
         byte[] bytes = storage.get(LOGICAL_TOPOLOGY_KEY);
 
-        return bytes == null ? LogicalTopologySnapshot.INITIAL : 
fromBytes(bytes);
+        return bytes == null ? LogicalTopologySnapshot.INITIAL
+                : VersionedSerialization.fromBytes(bytes, 
LogicalTopologySnapshotSerializer.INSTANCE);
     }
 
     @Override
@@ -152,7 +153,7 @@ public class LogicalTopologyImpl implements LogicalTopology 
{
     }
 
     private void saveSnapshotToStorage(LogicalTopologySnapshot newTopology) {
-        storage.put(LOGICAL_TOPOLOGY_KEY, toBytes(newTopology));
+        storage.put(LOGICAL_TOPOLOGY_KEY, 
VersionedSerialization.toBytes(newTopology, 
LogicalTopologySnapshotSerializer.INSTANCE));
     }
 
     @Override
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNode.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNode.java
index d4367dd456..9577d07a5f 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNode.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNode.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.cluster.management.topology.api;
 
-import java.util.Collections;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -26,6 +28,8 @@ import 
org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeMetadata;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Representation of a logical node in a cluster.
@@ -57,11 +61,7 @@ public class LogicalNode extends ClusterNodeImpl {
      * @param address Node address.
      */
     public LogicalNode(UUID id, String name, NetworkAddress address) {
-        super(id, name, address);
-
-        this.userAttributes = Collections.emptyMap();
-        this.systemAttributes = Collections.emptyMap();
-        this.storageProfiles = Collections.emptyList();
+        this(id, name, address, null, emptyMap(), emptyMap(), emptyList());
     }
 
     /**
@@ -71,28 +71,24 @@ public class LogicalNode extends ClusterNodeImpl {
      * @param userAttributes  Node attributes defined in configuration.
      */
     public LogicalNode(ClusterNode clusterNode, Map<String, String> 
userAttributes) {
-        this(clusterNode, userAttributes, Collections.emptyMap(), 
Collections.emptyList());
+        this(clusterNode, userAttributes, emptyMap(), emptyList());
     }
 
     /**
      * Constructor.
      *
-     * @param clusterNode Represents a node in a cluster.
+     * @param node Represents a node in a cluster.
      * @param userAttributes Node attributes defined in configuration.
      * @param systemAttributes Internal node attributes provided by system 
components at startup.
      * @param storageProfiles List of storage profiles, which the node 
supports.
      */
     public LogicalNode(
-            ClusterNode clusterNode,
+            ClusterNode node,
             Map<String, String> userAttributes,
             Map<String, String> systemAttributes,
             List<String> storageProfiles
     ) {
-        super(clusterNode.id(), clusterNode.name(), clusterNode.address(), 
clusterNode.nodeMetadata());
-
-        this.userAttributes = userAttributes == null ? Collections.emptyMap() 
: userAttributes;
-        this.systemAttributes = systemAttributes == null ? 
Collections.emptyMap() : systemAttributes;
-        this.storageProfiles = storageProfiles;
+        this(node.id(), node.name(), node.address(), node.nodeMetadata(), 
userAttributes, systemAttributes, storageProfiles);
     }
 
     /**
@@ -101,7 +97,23 @@ public class LogicalNode extends ClusterNodeImpl {
      * @param clusterNode    Represents a node in a cluster.
      */
     public LogicalNode(ClusterNode clusterNode) {
-        this(clusterNode, Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyList());
+        this(clusterNode, emptyMap(), emptyMap(), emptyList());
+    }
+
+    LogicalNode(
+            UUID id,
+            String name,
+            NetworkAddress address,
+            @Nullable NodeMetadata nodeMetadata,
+            Map<String, String> userAttributes,
+            Map<String, String> systemAttributes,
+            List<String> storageProfiles
+    ) {
+        super(id, name, address, nodeMetadata);
+
+        this.userAttributes = Map.copyOf(userAttributes);
+        this.systemAttributes = Map.copyOf(systemAttributes);
+        this.storageProfiles = List.copyOf(storageProfiles);
     }
 
     /**
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNodeSerializer.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNodeSerializer.java
new file mode 100644
index 0000000000..6a7a782bf5
--- /dev/null
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNodeSerializer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.topology.api;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.ignite.internal.network.ClusterNodeSerializer;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * {@link VersionedSerializer} for {@link LogicalNode} instances.
+ */
+public class LogicalNodeSerializer extends VersionedSerializer<LogicalNode> {
+    /** Serializer instance. */
+    public static final LogicalNodeSerializer INSTANCE = new 
LogicalNodeSerializer();
+
+    private final ClusterNodeSerializer clusterNodeSerializer = 
ClusterNodeSerializer.INSTANCE;
+
+    @Override
+    protected void writeExternalData(LogicalNode node, IgniteDataOutput out) 
throws IOException {
+        clusterNodeSerializer.writeExternal(node, out);
+
+        writeStringToStringMap(node.userAttributes(), out);
+        writeStringToStringMap(node.systemAttributes(), out);
+
+        out.writeVarInt(node.storageProfiles().size());
+        for (String profile : node.storageProfiles()) {
+            out.writeUTF(profile);
+        }
+    }
+
+    private static void writeStringToStringMap(Map<String, String> map, 
IgniteDataOutput output) throws IOException {
+        output.writeVarInt(map.size());
+
+        for (Entry<String, String> entry : map.entrySet()) {
+            output.writeUTF(entry.getKey());
+            output.writeUTF(entry.getValue());
+        }
+    }
+
+    @Override
+    protected LogicalNode readExternalData(byte protoVer, IgniteDataInput in) 
throws IOException {
+        ClusterNode node = clusterNodeSerializer.readExternal(in);
+
+        Map<String, String> userAttributes = readStringToStringMap(in);
+        Map<String, String> systemAttributes = readStringToStringMap(in);
+        List<String> storageProfiles = readStringList(in);
+
+        return new LogicalNode(node, userAttributes, systemAttributes, 
storageProfiles);
+    }
+
+    private static Map<String, String> readStringToStringMap(IgniteDataInput 
in) throws IOException {
+        int size = in.readVarIntAsInt();
+
+        var map = new HashMap<String, String>(IgniteUtils.capacity(size));
+        for (int i = 0; i < size; i++) {
+            map.put(in.readUTF(), in.readUTF());
+        }
+
+        return map;
+    }
+
+    private static List<String> readStringList(IgniteDataInput in) throws 
IOException {
+        int size = in.readVarIntAsInt();
+
+        var list = new ArrayList<String>(size);
+        for (int i = 0; i < size; i++) {
+            list.add(in.readUTF());
+        }
+
+        return list;
+    }
+}
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshot.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshot.java
index ed2823b5dc..79b29d93d3 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshot.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshot.java
@@ -19,7 +19,6 @@ package 
org.apache.ignite.internal.cluster.management.topology.api;
 
 import static java.util.Collections.emptySet;
 
-import java.io.Serializable;
 import java.util.Collection;
 import java.util.Set;
 import java.util.UUID;
@@ -33,9 +32,7 @@ import org.jetbrains.annotations.TestOnly;
  *
  * <p>Instances of this class are immutable.
  */
-public class LogicalTopologySnapshot implements Serializable {
-    private static final long serialVersionUID = 0L;
-
+public class LogicalTopologySnapshot {
     /** Version that first topology snapshot in history will have. */
     public static final long FIRST_VERSION = 1;
 
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshotSerializer.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshotSerializer.java
new file mode 100644
index 0000000000..e281ab9198
--- /dev/null
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshotSerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.topology.api;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+/**
+ * {@link VersionedSerializer} for {@link LogicalTopologySnapshot} instances.
+ */
+public class LogicalTopologySnapshotSerializer extends 
VersionedSerializer<LogicalTopologySnapshot> {
+    /** Serializer instance. */
+    public static final LogicalTopologySnapshotSerializer INSTANCE = new 
LogicalTopologySnapshotSerializer();
+
+    private final LogicalNodeSerializer logicalNodeSerializer = 
LogicalNodeSerializer.INSTANCE;
+
+    @Override
+    protected void writeExternalData(LogicalTopologySnapshot snapshot, 
IgniteDataOutput out) throws IOException {
+        out.writeVarInt(snapshot.version());
+
+        out.writeVarInt(snapshot.nodes().size());
+        for (LogicalNode node : snapshot.nodes()) {
+            logicalNodeSerializer.writeExternal(node, out);
+        }
+
+        out.writeUuid(snapshot.clusterId());
+    }
+
+    @Override
+    protected LogicalTopologySnapshot readExternalData(byte protoVer, 
IgniteDataInput in) throws IOException {
+        long version = in.readVarInt();
+
+        int nodesCount = in.readVarIntAsInt();
+        Set<LogicalNode> nodes = new 
HashSet<>(IgniteUtils.capacity(nodesCount));
+        for (int i = 0; i < nodesCount; i++) {
+            nodes.add(logicalNodeSerializer.readExternal(in));
+        }
+
+        UUID clusterId = in.readUuid();
+
+        return new LogicalTopologySnapshot(version, nodes, clusterId);
+    }
+}
diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializerTest.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializerTest.java
new file mode 100644
index 0000000000..eb8191bb5c
--- /dev/null
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterStatePersistentSerializerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static java.util.UUID.randomUUID;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+
+class ClusterStatePersistentSerializerTest {
+    private static final CmgMessagesFactory CMG_MESSAGES_FACTORY = new 
CmgMessagesFactory();
+
+    private final ClusterStatePersistentSerializer serializer = new 
ClusterStatePersistentSerializer();
+
+    @Test
+    void serializationAndDeserializationWithoutNulls() {
+        ClusterState originalState = CMG_MESSAGES_FACTORY.clusterState()
+                .cmgNodes(Set.of("a", "b"))
+                .metaStorageNodes(Set.of("c", "d"))
+                .version("3.0.0")
+                .clusterTag(ClusterTag.randomClusterTag(CMG_MESSAGES_FACTORY, 
"cluster"))
+                .initialClusterConfiguration("config")
+                .formerClusterIds(List.of(randomUUID(), randomUUID()))
+                .build();
+
+        byte[] bytes = VersionedSerialization.toBytes(originalState, 
serializer);
+        ClusterState restoredState = VersionedSerialization.fromBytes(bytes, 
serializer);
+
+        assertThat(restoredState, is(equalTo(originalState)));
+    }
+
+    @Test
+    void serializationAndDeserializationWithNulls() {
+        ClusterState originalState = CMG_MESSAGES_FACTORY.clusterState()
+                .cmgNodes(Set.of("a", "b"))
+                .metaStorageNodes(Set.of("c", "d"))
+                .version("3.0.0")
+                .clusterTag(ClusterTag.randomClusterTag(CMG_MESSAGES_FACTORY, 
"cluster"))
+                .initialClusterConfiguration(null)
+                .formerClusterIds(null)
+                .build();
+
+        byte[] bytes = VersionedSerialization.toBytes(originalState, 
serializer);
+        ClusterState restoredState = VersionedSerialization.fromBytes(bytes, 
serializer);
+
+        assertThat(restoredState, is(equalTo(originalState)));
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode("Ae++QwMBYQFiAwFjAWQFMy4wLjAHY2x1c3Rlcp1Ct7dR35ELuoeboFbabrgHY29uZmlnAztFBahaoEfJtxGam"
+                + "Q6WXJNFRfruL76Bv254dP54iF6V");
+        ClusterState restoredState = VersionedSerialization.fromBytes(bytes, 
serializer);
+
+        assertThat(restoredState.cmgNodes(), containsInAnyOrder("a", "b"));
+        assertThat(restoredState.metaStorageNodes(), containsInAnyOrder("c", 
"d"));
+        assertThat(restoredState.version(), is("3.0.0"));
+        assertThat(restoredState.initialClusterConfiguration(), is("config"));
+        assertThat(
+                restoredState.formerClusterIds(),
+                
contains(UUID.fromString("c947a05a-a805-453b-935c-960e999a11b7"), 
UUID.fromString("bf81be2f-eefa-4545-955e-8878fe74786e"))
+        );
+    }
+}
diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/LocalStateStorageTest.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/LocalStateStorageTest.java
new file mode 100644
index 0000000000..ca97f2b7be
--- /dev/null
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/LocalStateStorageTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.Base64;
+import java.util.Set;
+import java.util.UUID;
+import 
org.apache.ignite.internal.cluster.management.LocalStateStorage.LocalState;
+import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class LocalStateStorageTest {
+    private static final CmgMessagesFactory CMG_MESSAGES_FACTORY = new 
CmgMessagesFactory();
+
+    private VaultManager vault;
+
+    private LocalStateStorage storage;
+
+    @BeforeEach
+    void setUp() {
+        vault = new VaultManager(new InMemoryVaultService());
+
+        assertThat(vault.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        storage = new LocalStateStorage(vault);
+    }
+
+    @Test
+    void serializationAndDeserialization() {
+        LocalState originalState = new LocalState(
+                Set.of("a", "b"),
+                ClusterTag.clusterTag(CMG_MESSAGES_FACTORY, "cluster", new 
UUID(0x12345678L, 0x87654321L))
+        );
+
+        storage.saveLocalState(originalState);
+
+        LocalState restoredState = storage.getLocalState();
+
+        assertThat(restoredState, is(notNullValue()));
+        assertThat(restoredState.cmgNodeNames(), containsInAnyOrder("a", "b"));
+        assertThat(restoredState.clusterTag(), is(originalState.clusterTag()));
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        vault.put(new ByteArray("cmg_state"), 
Base64.getDecoder().decode("Ae++QwMBYQFiB2NsdXN0ZXLvzauQeFY0EiFDZYcJutz+"));
+
+        LocalState localState = storage.getLocalState();
+
+        assertThat(localState, is(notNullValue()));
+        assertThat(localState.cmgNodeNames(), containsInAnyOrder("a", "b"));
+        assertThat(localState.clusterTag().clusterName(), is("cluster"));
+        assertThat(localState.clusterTag().clusterId(), is(new 
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
+    }
+}
diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNodeSerializerTest.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNodeSerializerTest.java
new file mode 100644
index 0000000000..72d2af3dd6
--- /dev/null
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalNodeSerializerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.topology.api;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeMetadata;
+import org.junit.jupiter.api.Test;
+
+class LogicalNodeSerializerTest {
+    private final LogicalNodeSerializer serializer = new 
LogicalNodeSerializer();
+
+    @Test
+    void serializationAndDeserialization() {
+        LogicalNode originalNode = new LogicalNode(
+                UUID.randomUUID(),
+                "test",
+                new NetworkAddress("host", 3000),
+                new NodeMetadata("ext-host", 3001, 3002),
+                Map.of("ukey", "uval"),
+                Map.of("skey", "sval"),
+                List.of("profile")
+        );
+
+        byte[] bytes = VersionedSerialization.toBytes(originalNode, 
serializer);
+        LogicalNode restoredNode = VersionedSerialization.fromBytes(bytes, 
serializer);
+
+        assertThat(restoredNode.id(), equalTo(originalNode.id()));
+        assertThat(restoredNode.name(), equalTo("test"));
+        assertThat(restoredNode.address(), equalTo(new NetworkAddress("host", 
3000)));
+        assertThat(restoredNode.nodeMetadata(), equalTo(new 
NodeMetadata("ext-host", 3001, 3002)));
+        assertThat(restoredNode.userAttributes(), equalTo(Map.of("ukey", 
"uval")));
+        assertThat(restoredNode.systemAttributes(), equalTo(Map.of("skey", 
"sval")));
+        assertThat(restoredNode.storageProfiles(), 
equalTo(List.of("profile")));
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode("Ae++QwHvvkPvzauQeFY0EiFDZYcJutz+BHRlc3QEaG9zdLkXAQhleHQtaG9zdLoXuxcCBHVrZXkEdXZhbAI"
+                + "Ec2tleQRzdmFsAgdwcm9maWxl");
+
+        LogicalNode restoredNode = VersionedSerialization.fromBytes(bytes, 
serializer);
+
+        assertThat(restoredNode.id(), is(new UUID(0x1234567890ABCDEFL, 
0xFEDCBA0987654321L)));
+        assertThat(restoredNode.name(), equalTo("test"));
+        assertThat(restoredNode.address(), equalTo(new NetworkAddress("host", 
3000)));
+        assertThat(restoredNode.nodeMetadata(), equalTo(new 
NodeMetadata("ext-host", 3001, 3002)));
+        assertThat(restoredNode.userAttributes(), equalTo(Map.of("ukey", 
"uval")));
+        assertThat(restoredNode.systemAttributes(), equalTo(Map.of("skey", 
"sval")));
+        assertThat(restoredNode.storageProfiles(), 
equalTo(List.of("profile")));
+    }
+}
diff --git 
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshotSerializerTest.java
 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshotSerializerTest.java
new file mode 100644
index 0000000000..dcc911a5da
--- /dev/null
+++ 
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/api/LogicalTopologySnapshotSerializerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.topology.api;
+
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Base64;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeMetadata;
+import org.junit.jupiter.api.Test;
+
+class LogicalTopologySnapshotSerializerTest {
+    private final LogicalTopologySnapshotSerializer serializer = new 
LogicalTopologySnapshotSerializer();
+
+    @Test
+    void serializationAndDeserialization() {
+        LogicalTopologySnapshot originalSnapshot = new 
LogicalTopologySnapshot(123L, List.of(node(0), node(1)), new UUID(1, 2));
+
+        byte[] bytes = VersionedSerialization.toBytes(originalSnapshot, 
serializer);
+        LogicalTopologySnapshot restoredSnapshot = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredSnapshot.version(), is(originalSnapshot.version()));
+        assertThat(restoredSnapshot.nodes(), 
equalTo(originalSnapshot.nodes()));
+        assertThat(restoredSnapshot.clusterId(), 
equalTo(originalSnapshot.clusterId()));
+    }
+
+    private static LogicalNode node(int index) {
+        return new LogicalNode(
+                new UUID(0xDEADBEEFCAFEBABEL, index),
+                "node" + index,
+                new NetworkAddress("host" + index, 3000 + index),
+                new NodeMetadata("rest-host" + index, 80 + index, 443 + index),
+                Map.of("ukey", "uval" + index),
+                Map.of("skey", "sval" + index),
+                List.of("prof1", "prof2")
+        );
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode("Ae++Q3wDAe++QwHvvkO+uv7K776t3gAAAAAAAAAABW5vZGUwBWhvc3QwuRcBCnJlc3QtaG9zdDBRvAMCBHVrZ"
+                + 
"XkFdXZhbDACBHNrZXkFc3ZhbDADBXByb2YxBXByb2YyAe++QwHvvkO+uv7K776t3gEAAAAAAAAABW5vZGUxBWhvc3QxuhcBCnJlc3QtaG9zdDFSvQMCBH"
+                + 
"VrZXkFdXZhbDECBHNrZXkFc3ZhbDEDBXByb2YxBXByb2YyAQAAAAAAAAACAAAAAAAAAA==");
+        LogicalTopologySnapshot restoredSnapshot = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredSnapshot.version(), is(123L));
+        assertThat(restoredSnapshot.nodes(), hasSize(2));
+
+        List<LogicalNode> orderedNodes = restoredSnapshot.nodes().stream()
+                .sorted(Comparator.comparing(LogicalNode::id))
+                .collect(toList());
+
+        LogicalNode node0 = orderedNodes.get(0);
+        assertThat(node0.id(), is(new UUID(0xDEADBEEFCAFEBABEL, 0)));
+        assertThat(node0.name(), is("node0"));
+        assertThat(node0.address(), is(new NetworkAddress("host0", 3000)));
+        assertThat(node0.nodeMetadata(), is(new NodeMetadata("rest-host0", 80, 
443)));
+        assertThat(node0.userAttributes(), is(Map.of("ukey", "uval0")));
+        assertThat(node0.systemAttributes(), is(Map.of("skey", "sval0")));
+        assertThat(node0.storageProfiles(), is(List.of("prof1", "prof2")));
+
+        LogicalNode node1 = orderedNodes.get(1);
+        assertThat(node1.id(), is(new UUID(0xDEADBEEFCAFEBABEL, 1)));
+        assertThat(node1.name(), is("node1"));
+        assertThat(node1.address(), is(new NetworkAddress("host1", 3001)));
+        assertThat(node1.nodeMetadata(), is(new NodeMetadata("rest-host1", 81, 
444)));
+        assertThat(node1.userAttributes(), is(Map.of("ukey", "uval1")));
+        assertThat(node1.systemAttributes(), is(Map.of("skey", "sval1")));
+        assertThat(node1.storageProfiles(), is(List.of("prof1", "prof2")));
+
+        assertThat(restoredSnapshot.clusterId(), equalTo(new UUID(1, 2)));
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/VarIntUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/VarIntUtils.java
index 0688656c09..6a43e19ab0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/VarIntUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/VarIntUtils.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.util;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
@@ -35,16 +38,16 @@ import java.nio.ByteBuffer;
  */
 public class VarIntUtils {
     /**
-     * Returns number of bytes that are needed to represent the given integer 
as a varint.
+     * Returns number of bytes that are needed to represent the given long as 
a varint.
      *
      * @param val Int value.
      */
-    public static int varIntLength(int val) {
+    public static int varIntLength(long val) {
         val++;
 
         int len = 0;
 
-        while ((val & 0xFFFF_FF80) != 0) {
+        while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
             len++;
 
             val >>>= 7;
@@ -53,6 +56,31 @@ public class VarIntUtils {
         return len + 1;
     }
 
+    /**
+     * Writes a primitive {@code long} value as a varint to the provided 
output.
+     *
+     * @param val Value.
+     * @return Number of bytes written.
+     */
+    public static int writeVarInt(long val, DataOutput out) throws IOException 
{
+        val++;
+
+        int written = 0;
+
+        while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
+            byte b = (byte) (val | 0x80);
+
+            out.writeByte(b);
+
+            val >>>= 7;
+            written++;
+        }
+
+        out.writeByte((byte) val);
+
+        return written + 1;
+    }
+
     /**
      * Writes a primitive {@code int} value as a varint to the provided array.
      *
@@ -61,12 +89,12 @@ public class VarIntUtils {
      * @param off Offset in the target array to write result to.
      * @return Number of bytes overwritten in {@code bytes} array.
      */
-    public static int putVarIntToBytes(int val, byte[] bytes, int off) {
+    public static int putVarIntToBytes(long val, byte[] bytes, int off) {
         val++;
 
         int pos = off;
 
-        while ((val & 0xFFFF_FF80) != 0) {
+        while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
             byte b = (byte) (val | 0x80);
 
             bytes[pos++] = b;
@@ -83,15 +111,37 @@ public class VarIntUtils {
      * Reads a varint from a buffer.
      *
      * @param buf Buffer from which to read.
-     * @return Integer value that was encoded as a varint.
+     * @return Long value that was encoded as a varint.
      */
-    public static int readVarInt(ByteBuffer buf) {
-        int res = 0;
+    public static long readVarInt(ByteBuffer buf) {
+        long res = 0;
 
         for (int shift = 0; ; shift += 7) {
             byte b = buf.get();
 
-            res |= (b & 0x7F) << shift;
+            res |= ((long) b & 0x7F) << shift;
+
+            if (b >= 0) {
+                break;
+            }
+        }
+
+        return res - 1;
+    }
+
+    /**
+     * Reads a varint from an input.
+     *
+     * @param in Input from which to read.
+     * @return Long value that was encoded as a varint.
+     */
+    public static long readVarInt(DataInput in) throws IOException {
+        long res = 0;
+
+        for (int shift = 0; ; shift += 7) {
+            byte b = in.readByte();
+
+            res |= ((long) b & 0x7F) << shift;
 
             if (b >= 0) {
                 break;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java
index 0722241f6e..93d449e270 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java
@@ -95,6 +95,28 @@ public interface IgniteDataInput extends DataInput {
      */
     int read(byte[] b, int off, int len) throws IOException;
 
+    /**
+     * Reads a long value as a varint.
+     *
+     * @throws IOException If something goes wrong.
+     * @see IgniteDataOutput#writeVarInt(long)
+     */
+    long readVarInt() throws IOException;
+
+    /**
+     * Reads an int value as a varint.
+     *
+     * @throws IOException If something goes wrong.
+     * @see IgniteDataOutput#writeVarInt(long)
+     */
+    default int readVarIntAsInt() throws IOException {
+        long val = readVarInt();
+        if (val < Integer.MIN_VALUE || val > Integer.MAX_VALUE) {
+            throw new IOException("The value is expected to fit into int 
range, but it doesn't: " + val);
+        }
+        return (int) val;
+    }
+
     /**
      * Reads array of {@code byte}s.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java
index c1d54f24d8..befdacf036 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java
@@ -75,6 +75,15 @@ public interface IgniteDataOutput extends DataOutput {
      */
     void cleanup();
 
+    /**
+     * Writes a long value as a varint. Non-negative values and -1 are encoded 
efficiently with respect to compactness.
+     * Negative values (like -2) take a lot more space.
+     *
+     * @param val Value to write.
+     * @throws IOException If something goes wrong.
+     */
+    void writeVarInt(long val) throws IOException;
+
     /**
      * Writes array of {@code byte}s.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
index d3df8eea62..0f5d3655c0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
@@ -32,7 +32,6 @@ import java.io.InputStream;
 import java.io.UTFDataFormatException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDate;
@@ -46,6 +45,7 @@ import 
org.apache.ignite.internal.tostring.IgniteToStringBuilder;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.util.FastTimestamps;
 import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.VarIntUtils;
 
 /**
  * Data input based on {@code Unsafe} operations.
@@ -465,10 +465,10 @@ public class IgniteUnsafeDataInput extends InputStream 
implements IgniteDataInpu
     /** {@inheritDoc} */
     @Override
     public UUID readUuid() throws IOException {
-        int length = readByte();
-        byte[] bytes = readByteArray(length);
+        long high = readLong();
+        long low = readLong();
 
-        return UUID.fromString(new String(bytes, StandardCharsets.UTF_8));
+        return new UUID(high, low);
     }
 
     /** {@inheritDoc} */
@@ -704,6 +704,11 @@ public class IgniteUnsafeDataInput extends InputStream 
implements IgniteDataInpu
         }
     }
 
+    @Override
+    public long readVarInt() throws IOException {
+        return VarIntUtils.readVarInt(this);
+    }
+
     /** {@inheritDoc} */
     @Override
     public String readLine() throws IOException {
@@ -740,7 +745,7 @@ public class IgniteUnsafeDataInput extends InputStream 
implements IgniteDataInpu
     /** {@inheritDoc} */
     @Override
     public String readUTF() throws IOException {
-        return readUtfBody(VarInts.readUnsignedInt(this));
+        return readUtfBody(NaiveVarInts.readUnsignedInt(this));
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
index 7dee8c054d..4996262e93 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDate;
@@ -44,6 +43,7 @@ import 
org.apache.ignite.internal.tostring.IgniteToStringBuilder;
 import org.apache.ignite.internal.util.FastTimestamps;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.StringIntrospection;
+import org.apache.ignite.internal.util.VarIntUtils;
 
 /**
  * Data output based on {@code Unsafe} operations.
@@ -428,10 +428,8 @@ public class IgniteUnsafeDataOutput extends OutputStream 
implements IgniteDataOu
     /** {@inheritDoc} */
     @Override
     public void writeUuid(UUID val) throws IOException {
-        byte[] bytes = val.toString().getBytes(StandardCharsets.US_ASCII);
-
-        writeByte(bytes.length);
-        writeByteArray(bytes);
+        writeLong(val.getMostSignificantBits());
+        writeLong(val.getLeastSignificantBits());
     }
 
     /** {@inheritDoc} */
@@ -499,6 +497,11 @@ public class IgniteUnsafeDataOutput extends OutputStream 
implements IgniteDataOu
         out = null;
     }
 
+    @Override
+    public void writeVarInt(long val) throws IOException {
+        VarIntUtils.writeVarInt(val, this);
+    }
+
     /** {@inheritDoc} */
     @Override
     public void writeByteArray(byte[] arr) throws IOException {
@@ -716,7 +719,7 @@ public class IgniteUnsafeDataOutput extends OutputStream 
implements IgniteDataOu
      * @throws IOException In case of error.
      */
     private void writeUtf(String s, int utfLen) throws IOException {
-        VarInts.writeUnsignedInt(utfLen, this);
+        NaiveVarInts.writeUnsignedInt(utfLen, this);
 
         if (utfLen == s.length()) {
             writeAsciiStringBytes(s);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/VarInts.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/NaiveVarInts.java
similarity index 84%
rename from 
modules/core/src/main/java/org/apache/ignite/internal/util/io/VarInts.java
rename to 
modules/core/src/main/java/org/apache/ignite/internal/util/io/NaiveVarInts.java
index c1e9d4c8a9..095fdc298d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/io/VarInts.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/NaiveVarInts.java
@@ -20,12 +20,18 @@ package org.apache.ignite.internal.util.io;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import org.apache.ignite.internal.util.VarIntUtils;
 
 /**
- * Utils to read/write variable length ints.
+ * Utils to read/write naive variable length ints.
+ *
+ * <p>The 'naivety' relates to a simple algorithm. The naive varints produce 
more compact results than the 'general purpose' varints
+ * (see {@link VarIntUtils}) when the input integer is approximately between 
127 and 254, so they seem to be more appropriate
+ * for small values (like lengths of short strings and collections).
  */
-public class VarInts {
-    private VarInts() {
+// TODO: IGNITE-23461 remove this class, use VarIntUtils instead.
+public class NaiveVarInts {
+    private NaiveVarInts() {
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerialization.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerialization.java
new file mode 100644
index 0000000000..a6c539430b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerialization.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.versioned;
+
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+
+import java.io.IOException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;
+
+/**
+ * The entry point API to work with versioned serialization (used to persist 
objects).
+ *
+ * <p>To make it possible to persist an object, a {@link VersionedSerializer} 
is to be implemented and used with
+ * {@link #toBytes(Object, VersionedSerializer)} and {@link #fromBytes(byte[], 
VersionedSerializer)}.
+ *
+ * <p>It is responsibility of a serializer to support transparent object 
structure changes (addition of fields and so on).
+ *
+ * @see VersionedSerializer
+ */
+public class VersionedSerialization {
+    /** Initial capacity (in bytes) of the buffer used for data output. */
+    private static final int INITIAL_BUFFER_CAPACITY = 256;
+
+    /**
+     * Converts an object to bytes (including protocol version) that can be 
later converted back to an object
+     * using {@link #fromBytes(byte[], VersionedSerializer)}.
+     *
+     * @param object Object to serialize.
+     * @param serializer Serializer to do the serialization.
+     * @return Bytes representing the object.
+     * @see #fromBytes(byte[], VersionedSerializer)
+     */
+    public static <T> byte[] toBytes(T object, VersionedSerializer<T> 
serializer) {
+        try (IgniteUnsafeDataOutput out = new 
IgniteUnsafeDataOutput(INITIAL_BUFFER_CAPACITY)) {
+            serializer.writeExternal(object, out);
+
+            return out.array();
+        } catch (IOException e) {
+            throw new IgniteInternalException(INTERNAL_ERR, "Cannot 
serialize", e);
+        }
+    }
+
+    /**
+     * Deserializes an object serialized with {@link #toBytes(Object, 
VersionedSerializer)}.
+     *
+     * @param bytes Bytes to deserialize from.
+     * @param serializer Serializer to use.
+     * @return Deserialized object.
+     */
+    public static <T> T fromBytes(byte[] bytes, VersionedSerializer<T> 
serializer) {
+        IgniteUnsafeDataInput in = new IgniteUnsafeDataInput(bytes);
+
+        try {
+            T result = serializer.readExternal(in);
+
+            if (in.available() != 0) {
+                throw new IOException(in.available() + " bytes left unread 
after deserializing " + result);
+            }
+
+            return result;
+        } catch (IOException e) {
+            throw new IgniteInternalException(INTERNAL_ERR, "Cannot 
deserialize", e);
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
new file mode 100644
index 0000000000..a2d78c5d5d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/versioned/VersionedSerializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.versioned;
+
+import java.io.IOException;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+
+/**
+ * Serializes and deserializes objects in a versioned way: that is, includes 
version to make it possible to deserialize objects serialized
+ * (and persisted) in earlier versions later when the corresponding class'es 
structure has changed.
+ *
+ * <p>It is supposed to be used for the cases when some object needs to be 
persisted to be stored for some time (maybe, a long time).
+ *
+ * <p>If a format is changed (for example, a new field is added), the version 
returned by {@link #getProtocolVersion()} has
+ * to be incremented.
+ */
+public abstract class VersionedSerializer<T> {
+    /** Magic number to detect correct serialized objects. */
+    private static final int MAGIC = 0x43BEEF00;
+
+    /**
+     * Returns protocol version.
+     */
+    protected byte getProtocolVersion() {
+        return 1;
+    }
+
+    /**
+     * Save object's specific (that is, ignoring the signature and version) 
data content.
+     *
+     * @param object object to write.
+     * @param out Output to write data content.
+     * @throws IOException If an I/O error occurs.
+     */
+    protected abstract void writeExternalData(T object, IgniteDataOutput out) 
throws IOException;
+
+    /**
+     * Writes an object to an output, including a signature and version.
+     *
+     * @param object Object to write.
+     * @param out Output to which to write.
+     * @throws IOException If an I/O error occurs.
+     */
+    public final void writeExternal(T object, IgniteDataOutput out) throws 
IOException {
+        int hdr = MAGIC + Byte.toUnsignedInt(getProtocolVersion());
+
+        out.writeInt(hdr);
+
+        writeExternalData(object, out);
+    }
+
+    /**
+     * Load object's specific data content.
+     *
+     * @param protoVer Input object version.
+     * @param in Input to load data content.
+     * @throws IOException If an I/O error occurs.
+     */
+    protected abstract T readExternalData(byte protoVer, IgniteDataInput in) 
throws IOException;
+
+    /**
+     * Reads an object which was earlier saved with {@link 
#writeExternal(Object, IgniteDataOutput)}.
+     *
+     * <p>Signature is checked when reading.
+     *
+     * @param in Input from which to read.
+     * @throws IOException If an I/O error occurs.
+     * @see #writeExternal(Object, IgniteDataOutput)
+     */
+    public final T readExternal(IgniteDataInput in) throws IOException {
+        int hdr = in.readInt();
+
+        if ((hdr & MAGIC) != MAGIC) {
+            throw new IOException("Unexpected serialized object header " + 
"[actual=" + Integer.toHexString(hdr)
+                    + ", expected=" + Integer.toHexString(MAGIC) + "]");
+        }
+
+        byte ver = (byte) (hdr & 0xFF);
+
+        return readExternalData(ver, in);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/VarIntUtilsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/VarIntUtilsTest.java
index 6f1ebc1b75..374e61d000 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/VarIntUtilsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/VarIntUtilsTest.java
@@ -22,6 +22,8 @@ import static org.hamcrest.Matchers.is;
 
 import java.nio.ByteBuffer;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -29,34 +31,56 @@ import org.junit.jupiter.params.provider.MethodSource;
 
 class VarIntUtilsTest {
     @ParameterizedTest
-    @MethodSource("sampleInts")
-    void readingAndWritingVarintsIsCompatible(int intVal) {
+    @MethodSource("sampleLongs")
+    void readingAndWritingVarintsInMemoryIsCompatible(long longVal) {
         byte[] array = new byte[10];
 
-        VarIntUtils.putVarIntToBytes(intVal, array, 1);
+        VarIntUtils.putVarIntToBytes(longVal, array, 1);
 
         ByteBuffer buf = ByteBuffer.wrap(array);
         buf.position(1);
 
-        assertThat(VarIntUtils.readVarInt(buf), is(intVal));
+        assertThat(VarIntUtils.readVarInt(buf), is(longVal));
     }
 
     @ParameterizedTest
-    @MethodSource("sampleInts")
-    void writingVarIntReturnsNumberOfBytesInItsRepresentation(int intVal) {
+    @MethodSource("sampleLongs")
+    void readingAndWritingVarintsInIoIsCompatible(long longVal) throws 
Exception {
+        IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(100);
+
+        VarIntUtils.writeVarInt(longVal, out);
+
+        IgniteUnsafeDataInput in = new IgniteUnsafeDataInput(out.array());
+
+        assertThat(VarIntUtils.readVarInt(in), is(longVal));
+    }
+
+    @ParameterizedTest
+    @MethodSource("sampleLongs")
+    void writingVarIntInMemoryReturnsNumberOfBytesInItsRepresentation(long 
longVal) {
         byte[] array = new byte[10];
 
-        int len = VarIntUtils.putVarIntToBytes(intVal, array, 0);
+        int len = VarIntUtils.putVarIntToBytes(longVal, array, 0);
 
-        assertThat(VarIntUtils.varIntLength(intVal), is(len));
+        assertThat(VarIntUtils.varIntLength(longVal), is(len));
     }
 
     @ParameterizedTest
-    @MethodSource("sampleInts")
-    void readingVarIntConsumesExactlyItsBytes(int intVal) {
+    @MethodSource("sampleLongs")
+    void writingVarIntInIoReturnsNumberOfBytesInItsRepresentation(long 
longVal) throws Exception {
+        IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(100);
+
+        int len = VarIntUtils.writeVarInt(longVal, out);
+
+        assertThat(VarIntUtils.varIntLength(longVal), is(len));
+    }
+
+    @ParameterizedTest
+    @MethodSource("sampleLongs")
+    void readingVarIntFromMemoryConsumesExactlyItsBytes(long longVal) {
         byte[] array = new byte[10];
 
-        int len = VarIntUtils.putVarIntToBytes(intVal, array, 0);
+        int len = VarIntUtils.putVarIntToBytes(longVal, array, 0);
 
         ByteBuffer buf = ByteBuffer.wrap(array);
 
@@ -65,16 +89,30 @@ class VarIntUtilsTest {
         assertThat(buf.position(), is(len));
     }
 
-    private static Stream<Arguments> sampleInts() {
+    @ParameterizedTest
+    @MethodSource("sampleLongs")
+    void readingVarIntFromIoConsumesExactlyItsBytes(long longVal) throws 
Exception {
+        byte[] array = new byte[10];
+
+        int len = VarIntUtils.putVarIntToBytes(longVal, array, 0);
+
+        IgniteUnsafeDataInput in = new IgniteUnsafeDataInput(array);
+
+        VarIntUtils.readVarInt(in);
+
+        assertThat(in.available(), is(array.length - len));
+    }
+
+    private static Stream<Arguments> sampleLongs() {
         return Stream.of(
-                -1, 0, 1,
-                128 - 2, 128 - 1, 255, 256,
-                128 * 128 - 2, 128 * 128 - 1,
-                65535, 65536,
-                128 * 128 * 128 - 2, 128 * 128 * 128 - 1,
-                16777215, 16777216,
-                128 * 128 * 128 * 128 - 2, 128 * 128 * 128 * 128 - 1,
-                2147483647
+                -1L, 0L, 1L,
+                128L - 2, 128L - 1, 255L, 256L,
+                128L * 128 - 2, 128L * 128 - 1,
+                65535L, 65536L,
+                128L * 128 * 128 - 2, 128L * 128 * 128 - 1,
+                16777215L, 16777216L,
+                128L * 128 * 128 * 128 - 2, 128L * 128 * 128 * 128 - 1,
+                2147483647L
         ).map(Arguments::of);
     }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/io/VarIntsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/io/NaiveVarIntsTest.java
similarity index 91%
rename from 
modules/core/src/test/java/org/apache/ignite/internal/util/io/VarIntsTest.java
rename to 
modules/core/src/test/java/org/apache/ignite/internal/util/io/NaiveVarIntsTest.java
index c801e182c0..7ce70a22ad 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/io/VarIntsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/io/NaiveVarIntsTest.java
@@ -33,11 +33,11 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-class VarIntsTest {
+class NaiveVarIntsTest {
     @ParameterizedTest
     @MethodSource("intRangeBorders")
     void writesIntsWithCorrectLengths(IntWriteSpec spec) throws Exception {
-        byte[] bytes = writeToBytes(output -> 
VarInts.writeUnsignedInt(spec.value, output));
+        byte[] bytes = writeToBytes(output -> 
NaiveVarInts.writeUnsignedInt(spec.value, output));
 
         assertThat(bytes.length, is(spec.expectedLength));
     }
@@ -73,8 +73,8 @@ class VarIntsTest {
     @ParameterizedTest
     @MethodSource("intRangeBorders")
     void writesAndReadsInts(IntWriteSpec spec) throws Exception {
-        byte[] bytes = writeToBytes(output -> 
VarInts.writeUnsignedInt(spec.value, output));
-        int result = readIntFromBytesConsuming(bytes, 
VarInts::readUnsignedInt);
+        byte[] bytes = writeToBytes(output -> 
NaiveVarInts.writeUnsignedInt(spec.value, output));
+        int result = readIntFromBytesConsuming(bytes, 
NaiveVarInts::readUnsignedInt);
 
         assertThat(result, is(spec.value));
     }
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index 5a55828be6..f6b6b169ab 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -31,10 +31,12 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshotSerializer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.server.KeyValueUpdateContext;
 import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.Test;
 
@@ -177,7 +179,13 @@ public class 
DistributionZoneManagerLogicalTopologyEventsTest extends BaseDistri
 
         var clusterNodes2 = Set.of(NODE_1, NODE_2);
 
-        clusterStateStorage.put(LOGICAL_TOPOLOGY_KEY, ByteUtils.toBytes(new 
LogicalTopologySnapshot(10L, clusterNodes2, clusterId)));
+        clusterStateStorage.put(
+                LOGICAL_TOPOLOGY_KEY,
+                VersionedSerialization.toBytes(
+                        new LogicalTopologySnapshot(10L, clusterNodes2, 
clusterId),
+                        LogicalTopologySnapshotSerializer.INSTANCE
+                )
+        );
 
         topology.fireTopologyLeap();
 
@@ -198,7 +206,13 @@ public class 
DistributionZoneManagerLogicalTopologyEventsTest extends BaseDistri
 
         var clusterNodes2 = Set.of(NODE_1, NODE_2);
 
-        clusterStateStorage.put(LOGICAL_TOPOLOGY_KEY, ByteUtils.toBytes(new 
LogicalTopologySnapshot(10L, clusterNodes2, clusterId)));
+        clusterStateStorage.put(
+                LOGICAL_TOPOLOGY_KEY,
+                VersionedSerialization.toBytes(
+                        new LogicalTopologySnapshot(10L, clusterNodes2, 
clusterId),
+                        LogicalTopologySnapshotSerializer.INSTANCE
+                )
+        );
 
         keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(), 
ByteUtils.longToBytesKeepingOrder(11L), KV_UPDATE_CONTEXT);
 
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
index 155189179c..b36870ca9f 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
@@ -69,6 +69,7 @@ import 
org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
 import org.apache.ignite.internal.catalog.events.DropZoneEventParameters;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshotSerializer;
 import 
org.apache.ignite.internal.distributionzones.BaseDistributionZoneManagerTest;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.DistributionZonesUtil;
@@ -85,7 +86,7 @@ import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.server.If;
 import 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
-import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.jetbrains.annotations.Nullable;
@@ -1284,7 +1285,13 @@ public class DistributionZoneCausalityDataNodesTest 
extends BaseDistributionZone
 
         long topVer = topology.getLogicalTopology().version() + 1;
 
-        clusterStateStorage.put(LOGICAL_TOPOLOGY_KEY, ByteUtils.toBytes(new 
LogicalTopologySnapshot(topVer, nodes)));
+        clusterStateStorage.put(
+                LOGICAL_TOPOLOGY_KEY,
+                VersionedSerialization.toBytes(
+                        new LogicalTopologySnapshot(topVer, nodes),
+                        LogicalTopologySnapshotSerializer.INSTANCE
+                )
+        );
 
         topology.fireTopologyLeap();
 
diff --git 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/ClusterNodeSerializer.java
 
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/ClusterNodeSerializer.java
new file mode 100644
index 0000000000..dc6c2f551c
--- /dev/null
+++ 
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/ClusterNodeSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeMetadata;
+
+/**
+ * {@link VersionedSerializer} for {@link ClusterNode} instances.
+ */
+public class ClusterNodeSerializer extends VersionedSerializer<ClusterNode> {
+    /** Serializer instance. */
+    public static final ClusterNodeSerializer INSTANCE = new 
ClusterNodeSerializer();
+
+    @Override
+    protected void writeExternalData(ClusterNode node, IgniteDataOutput out) 
throws IOException {
+        out.writeUuid(node.id());
+        out.writeUTF(node.name());
+
+        out.writeUTF(node.address().host());
+        out.writeVarInt(node.address().port());
+
+        NodeMetadata metadata = node.nodeMetadata();
+        out.writeBoolean(metadata != null);
+        if (metadata != null) {
+            out.writeUTF(metadata.restHost());
+            out.writeVarInt(metadata.httpPort());
+            out.writeVarInt(metadata.httpsPort());
+        }
+    }
+
+    @Override
+    protected ClusterNode readExternalData(byte protoVer, IgniteDataInput in) 
throws IOException {
+        UUID id = in.readUuid();
+        String name = in.readUTF();
+        NetworkAddress address = new NetworkAddress(in.readUTF(), 
in.readVarIntAsInt());
+
+        boolean hasMetadata = in.readBoolean();
+        NodeMetadata metadata;
+        if (hasMetadata) {
+            metadata = new NodeMetadata(in.readUTF(), in.readVarIntAsInt(), 
in.readVarIntAsInt());
+        } else {
+            metadata = null;
+        }
+
+        return new ClusterNodeImpl(id, name, address, metadata);
+    }
+}
diff --git 
a/modules/network-api/src/test/java/org/apache/ignite/internal/network/ClusterNodeSerializerTest.java
 
b/modules/network-api/src/test/java/org/apache/ignite/internal/network/ClusterNodeSerializerTest.java
new file mode 100644
index 0000000000..4dcfc3eeae
--- /dev/null
+++ 
b/modules/network-api/src/test/java/org/apache/ignite/internal/network/ClusterNodeSerializerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.util.Base64;
+import java.util.UUID;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeMetadata;
+import org.junit.jupiter.api.Test;
+
+class ClusterNodeSerializerTest {
+    private final ClusterNodeSerializer serializer = new 
ClusterNodeSerializer();
+
+    @Test
+    void serializationAndDeserialization() {
+        ClusterNode originalNode = new ClusterNodeImpl(
+                UUID.randomUUID(),
+                "test",
+                new NetworkAddress("host", 3000),
+                new NodeMetadata("ext-host", 3001, 3002)
+        );
+
+        byte[] bytes = VersionedSerialization.toBytes(originalNode, 
serializer);
+        ClusterNode restoredNode = VersionedSerialization.fromBytes(bytes, 
serializer);
+
+        assertThat(restoredNode.id(), equalTo(originalNode.id()));
+        assertThat(restoredNode.name(), equalTo("test"));
+        assertThat(restoredNode.address(), equalTo(new NetworkAddress("host", 
3000)));
+        assertThat(restoredNode.nodeMetadata(), equalTo(new 
NodeMetadata("ext-host", 3001, 3002)));
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode("Ae++Q5FHweYb0PNweo7gWHuyYaUEdGVzdARob3N0uRcBCGV4dC1ob3N0uhe7Fw==");
+
+        ClusterNode restoredNode = VersionedSerialization.fromBytes(bytes, 
serializer);
+
+        assertThat(restoredNode.id(), 
equalTo(UUID.fromString("70f3d01b-e6c1-4791-a561-b27b58e08e7a")));
+        assertThat(restoredNode.name(), equalTo("test"));
+        assertThat(restoredNode.address(), equalTo(new NetworkAddress("host", 
3000)));
+        assertThat(restoredNode.nodeMetadata(), equalTo(new 
NodeMetadata("ext-host", 3001, 3002)));
+    }
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
index d991709b3d..0f775a6d9d 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
@@ -21,7 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.BitSet;
-import org.apache.ignite.internal.util.io.VarInts;
+import org.apache.ignite.internal.util.io.NaiveVarInts;
 
 /**
  * Protocol-wide elements marshalling.
@@ -31,28 +31,28 @@ class ProtocolMarshalling {
     static final int MAX_LENGTH_BYTE_COUNT = 4;
 
     static void writeDescriptorOrCommandId(int id, DataOutput output) throws 
IOException {
-        VarInts.writeUnsignedInt(id, output);
+        NaiveVarInts.writeUnsignedInt(id, output);
     }
 
     static int readDescriptorOrCommandId(DataInput input) throws IOException {
-        return VarInts.readUnsignedInt(input);
+        return NaiveVarInts.readUnsignedInt(input);
     }
 
     static void writeObjectId(int id, DataOutput output) throws IOException {
-        VarInts.writeUnsignedInt(id, output);
+        NaiveVarInts.writeUnsignedInt(id, output);
     }
 
     static int readObjectId(DataInput input) throws IOException {
-        return VarInts.readUnsignedInt(input);
+        return NaiveVarInts.readUnsignedInt(input);
     }
 
 
     static void writeLength(int length, DataOutput output) throws IOException {
-        VarInts.writeUnsignedInt(length, output);
+        NaiveVarInts.writeUnsignedInt(length, output);
     }
 
     static int readLength(DataInput input) throws IOException {
-        return VarInts.readUnsignedInt(input);
+        return NaiveVarInts.readUnsignedInt(input);
     }
 
     static void writeFixedLengthBitSet(BitSet bitset, int bitSetLength, 
DataOutput output) throws IOException {
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java
index 2d48f9ae17..2a9a5042e6 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/marshaller/PartitionCommandsMarshallerImpl.java
@@ -64,6 +64,6 @@ public class PartitionCommandsMarshallerImpl extends 
OptimizedMarshaller impleme
      */
     @Override
     public int readRequiredCatalogVersion(ByteBuffer raw) {
-        return VarIntUtils.readVarInt(raw);
+        return (int) VarIntUtils.readVarInt(raw);
     }
 }
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
index 01dd9d681e..066e8eb360 100644
--- 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
@@ -22,12 +22,14 @@ import static 
org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
 
 import java.util.UUID;
 import org.apache.ignite.internal.cluster.management.ClusterState;
+import 
org.apache.ignite.internal.cluster.management.ClusterStatePersistentSerializer;
 import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
 import org.apache.ignite.internal.disaster.system.storage.ClusterResetStorage;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -72,7 +74,8 @@ public class SystemDisasterRecoveryStorage implements 
ClusterResetStorage {
      *     the cluster yet).
      */
     public @Nullable ClusterState readClusterState() {
-        return readFromVault(CLUSTER_STATE_VAULT_KEY);
+        VaultEntry entry = vault.get(CLUSTER_STATE_VAULT_KEY);
+        return entry != null ? VersionedSerialization.fromBytes(entry.value(), 
ClusterStatePersistentSerializer.INSTANCE) : null;
     }
 
     private <T> @Nullable T readFromVault(ByteArray key) {
@@ -81,7 +84,7 @@ public class SystemDisasterRecoveryStorage implements 
ClusterResetStorage {
     }
 
     void saveClusterState(ClusterState clusterState) {
-        vault.put(CLUSTER_STATE_VAULT_KEY, ByteUtils.toBytes(clusterState));
+        vault.put(CLUSTER_STATE_VAULT_KEY, 
VersionedSerialization.toBytes(clusterState, 
ClusterStatePersistentSerializer.INSTANCE));
     }
 
     boolean isInitConfigApplied() {
diff --git 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
index 98719f97b5..d3031a3706 100644
--- 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
+++ 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -28,7 +28,6 @@ import static 
org.apache.ignite.internal.testframework.asserts.CompletableFuture
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -63,6 +62,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.IntStream;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.ClusterState;
+import 
org.apache.ignite.internal.cluster.management.ClusterStatePersistentSerializer;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
 import 
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
 import 
org.apache.ignite.internal.disaster.system.exception.ClusterResetException;
@@ -91,6 +91,7 @@ import 
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.jetbrains.annotations.Nullable;
@@ -214,7 +215,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
         VaultEntry entry = vaultManager.get(CLUSTER_STATE_VAULT_KEY);
         assertThat(entry, is(notNullValue()));
 
-        ClusterState savedState = fromBytes(entry.value());
+        ClusterState savedState = 
VersionedSerialization.fromBytes(entry.value(), 
ClusterStatePersistentSerializer.INSTANCE);
         assertThat(savedState, is(equalTo(usualClusterState)));
     }
 
@@ -285,7 +286,10 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     }
 
     private void putClusterState() {
-        vaultManager.put(CLUSTER_STATE_VAULT_KEY, toBytes(usualClusterState));
+        vaultManager.put(
+                CLUSTER_STATE_VAULT_KEY,
+                VersionedSerialization.toBytes(usualClusterState, 
ClusterStatePersistentSerializer.INSTANCE)
+        );
     }
 
     private void markInitConfigApplied() {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
index c42cee416c..2a42165081 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
@@ -64,6 +64,6 @@ public class PartitionCommandsMarshallerImpl extends 
OptimizedMarshaller impleme
      */
     @Override
     public int readRequiredCatalogVersion(ByteBuffer raw) {
-        return VarIntUtils.readVarInt(raw);
+        return (int) VarIntUtils.readVarInt(raw);
     }
 }

Reply via email to