This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eda6173fb8 IGNITE-28052 Use MessageSerializer for InitMessage (#12912)
8eda6173fb8 is described below

commit 8eda6173fb806f31372d2cd152fce9876dc3e964
Author: Nikita Amelchev <[email protected]>
AuthorDate: Fri Mar 20 22:52:46 2026 +0300

    IGNITE-28052 Use MessageSerializer for InitMessage (#12912)
---
 .../discovery/DiscoveryMessageFactory.java         |  24 +++++
 .../encryption/ChangeCacheEncryptionRequest.java   |  35 +++++--
 .../managers/encryption/GridEncryptionManager.java |  71 --------------
 .../encryption/MasterKeyChangeRequest.java         | 107 +++++++++++++++++++++
 .../snapshot/AbstractSnapshotOperationRequest.java |  36 ++++---
 .../snapshot/IgniteSnapshotManager.java            |  35 -------
 .../snapshot/SnapshotCheckProcessRequest.java      |  24 +++--
 .../snapshot/SnapshotOperationEndRequest.java      |  33 +++++--
 .../snapshot/SnapshotOperationRequest.java         |  41 +++++---
 .../snapshot/SnapshotRestoreProcess.java           |  70 +++++++-------
 .../snapshot/SnapshotRestoreStartRequest.java      |  56 +++++++++++
 .../snapshot/SnapshotStartDiscoveryMessage.java    |  71 ++++++++++++++
 .../PerformanceStatisticsProcessor.java            |   3 +-
 .../util/distributed/DistributedProcess.java       |   3 +-
 .../internal/util/distributed/InitMessage.java     |  34 +++++--
 .../encryption/CacheGroupKeyChangeTest.java        |   4 +-
 .../snapshot/IgniteSnapshotManagerSelfTest.java    |   2 +-
 .../DistributedProcessClientAwaitTest.java         |  24 ++---
 .../DistributedProcessCoordinatorLeftTest.java     |   8 +-
 .../DistributedProcessErrorHandlingTest.java       |   6 +-
 20 files changed, 460 insertions(+), 227 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 69800247c4b..e30853967bb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -19,6 +19,10 @@ package org.apache.ignite.internal.managers.discovery;
 
 import org.apache.ignite.internal.managers.communication.ErrorMessage;
 import 
org.apache.ignite.internal.managers.communication.ErrorMessageMarshallableSerializer;
+import 
org.apache.ignite.internal.managers.encryption.ChangeCacheEncryptionRequest;
+import 
org.apache.ignite.internal.managers.encryption.ChangeCacheEncryptionRequestSerializer;
+import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest;
+import 
org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequestSerializer;
 import org.apache.ignite.internal.processors.authentication.User;
 import 
org.apache.ignite.internal.processors.authentication.UserAcceptedMessage;
 import 
org.apache.ignite.internal.processors.authentication.UserAcceptedMessageSerializer;
@@ -55,18 +59,28 @@ import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.Snapshot
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckHandlersResponseSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckPartitionHashesResponse;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckPartitionHashesResponseMarshallableSerializer;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckProcessRequest;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckProcessRequestSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckResponse;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckResponseSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResult;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResultSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataResponse;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataResponseMarshallableSerializer;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationEndRequest;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationEndRequestSerializer;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationRequest;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationRequestSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationResponse;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationResponseSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyHandlerResponse;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyHandlerResponseMarshallableSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreOperationResponse;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreOperationResponseMarshallableSerializer;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStartRequest;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStartRequestSerializer;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotStartDiscoveryMessage;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotStartDiscoveryMessageSerializer;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionSerializer;
 import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
@@ -107,6 +121,8 @@ import 
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexD
 import 
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexDropOperationSerializer;
 import org.apache.ignite.internal.util.distributed.FullMessage;
 import org.apache.ignite.internal.util.distributed.FullMessageSerializer;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.distributed.InitMessageSerializer;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
@@ -264,6 +280,14 @@ public class DiscoveryMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)29, TcpDiscoveryNodeAddedMessage::new,
             new TcpDiscoveryNodeAddedMessageMarshallableSerializer(marsh, 
clsLdr));
         factory.register((short)30, FullMessage::new, new 
FullMessageSerializer());
+        factory.register((short)31, InitMessage::new, new 
InitMessageSerializer());
+        factory.register((short)32, SnapshotStartDiscoveryMessage::new, new 
SnapshotStartDiscoveryMessageSerializer());
+        factory.register((short)33, SnapshotCheckProcessRequest::new, new 
SnapshotCheckProcessRequestSerializer());
+        factory.register((short)34, SnapshotOperationRequest::new, new 
SnapshotOperationRequestSerializer());
+        factory.register((short)35, MasterKeyChangeRequest::new, new 
MasterKeyChangeRequestSerializer());
+        factory.register((short)36, SnapshotOperationEndRequest::new, new 
SnapshotOperationEndRequestSerializer());
+        factory.register((short)37, SnapshotRestoreStartRequest::new, new 
SnapshotRestoreStartRequestSerializer());
+        factory.register((short)38, ChangeCacheEncryptionRequest::new, new 
ChangeCacheEncryptionRequestSerializer());
 
         factory.register((short)86, GridCacheVersion::new, new 
GridCacheVersionSerializer());
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/ChangeCacheEncryptionRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/ChangeCacheEncryptionRequest.java
index 4498c47d8d9..abd75f6c8c7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/ChangeCacheEncryptionRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/ChangeCacheEncryptionRequest.java
@@ -17,32 +17,41 @@
 
 package org.apache.ignite.internal.managers.encryption;
 
-import java.io.Serializable;
 import java.util.Objects;
 import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 
 /**
  * Change cache group encryption key request.
  */
 @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
-public class ChangeCacheEncryptionRequest implements Serializable {
-    /** Serial version uid. */
-    private static final long serialVersionUID = 0L;
-
+public class ChangeCacheEncryptionRequest implements Message {
     /** Request ID. */
-    private final UUID reqId = UUID.randomUUID();
+    @Order(0)
+    UUID reqId;
 
     /** Cache group IDs. */
-    private final int[] grpIds;
+    @Order(1)
+    int[] grpIds;
 
     /** Encryption keys. */
-    private final byte[][] keys;
+    @Order(2)
+    byte[][] keys;
 
     /** Key identifiers. */
-    private final byte[] keyIds;
+    @Order(3)
+    byte[] keyIds;
 
     /** Master key digest. */
-    private final byte[] masterKeyDigest;
+    @Order(4)
+    byte[] masterKeyDigest;
+
+    /** Default constructor for {@link MessageFactory}. */
+    public ChangeCacheEncryptionRequest() {
+        // No-op.
+    }
 
     /**
      * @param grpIds Cache group IDs.
@@ -51,6 +60,7 @@ public class ChangeCacheEncryptionRequest implements 
Serializable {
      * @param masterKeyDigest Master key digest.
      */
     public ChangeCacheEncryptionRequest(int[] grpIds, byte[][] keys, byte[] 
keyIds, byte[] masterKeyDigest) {
+        this.reqId = UUID.randomUUID();
         this.grpIds = grpIds;
         this.keys = keys;
         this.keyIds = keyIds;
@@ -103,6 +113,11 @@ public class ChangeCacheEncryptionRequest implements 
Serializable {
         return Objects.equals(reqId, ((ChangeCacheEncryptionRequest)o).reqId);
     }
 
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 38;
+    }
+
     /** {@inheritDoc} */
     @Override public int hashCode() {
         return Objects.hash(reqId);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
index c5a217d3725..e20b1effd18 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
@@ -1748,77 +1748,6 @@ public class GridEncryptionManager extends 
GridManagerAdapter<EncryptionSpi> imp
         });
     }
 
-    /** Master key change request. */
-    private static class MasterKeyChangeRequest implements Serializable {
-        /** Serial version uid. */
-        private static final long serialVersionUID = 0L;
-
-        /** Request id. */
-        private final UUID reqId;
-
-        /** Encrypted master key name. */
-        private final byte[] encKeyName;
-
-        /** Master key digest. */
-        private final byte[] digest;
-
-        /**
-         * @param reqId Request id.
-         * @param encKeyName Encrypted master key name.
-         * @param digest Master key digest.
-         */
-        private MasterKeyChangeRequest(UUID reqId, byte[] encKeyName, byte[] 
digest) {
-            this.reqId = reqId;
-            this.encKeyName = encKeyName;
-            this.digest = digest;
-        }
-
-        /** @return Request id. */
-        UUID requestId() {
-            return reqId;
-        }
-
-        /** @return Encrypted master key name. */
-        byte[] encKeyName() {
-            return encKeyName;
-        }
-
-        /** @return Master key digest. */
-        byte[] digest() {
-            return digest;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (!(o instanceof MasterKeyChangeRequest))
-                return false;
-
-            MasterKeyChangeRequest key = (MasterKeyChangeRequest)o;
-
-            return Arrays.equals(encKeyName, key.encKeyName) &&
-                Arrays.equals(digest, key.digest) &&
-                Objects.equals(reqId, key.reqId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = Objects.hash(reqId);
-
-            res = 31 * res + Arrays.hashCode(encKeyName);
-            res = 31 * res + Arrays.hashCode(digest);
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MasterKeyChangeRequest.class, this);
-        }
-    }
-
     /** */
     protected static class NodeEncryptionKeys implements Serializable {
         /** */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/MasterKeyChangeRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/MasterKeyChangeRequest.java
new file mode 100644
index 00000000000..e4f4ce375e4
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/MasterKeyChangeRequest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+
+/** Master key change request. */
+public class MasterKeyChangeRequest implements Message {
+    /** Request id. */
+    @Order(0)
+    UUID reqId;
+
+    /** Encrypted master key name. */
+    @Order(1)
+    byte[] encKeyName;
+
+    /** Master key digest. */
+    @Order(2)
+    byte[] digest;
+
+    /** Default constructor for {@link MessageFactory}. */
+    public MasterKeyChangeRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param reqId Request id.
+     * @param encKeyName Encrypted master key name.
+     * @param digest Master key digest.
+     */
+    public MasterKeyChangeRequest(UUID reqId, byte[] encKeyName, byte[] 
digest) {
+        this.reqId = reqId;
+        this.encKeyName = encKeyName;
+        this.digest = digest;
+    }
+
+    /** @return Request id. */
+    UUID requestId() {
+        return reqId;
+    }
+
+    /** @return Encrypted master key name. */
+    byte[] encKeyName() {
+        return encKeyName;
+    }
+
+    /** @return Master key digest. */
+    byte[] digest() {
+        return digest;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof MasterKeyChangeRequest))
+            return false;
+
+        MasterKeyChangeRequest key = (MasterKeyChangeRequest)o;
+
+        return Arrays.equals(encKeyName, key.encKeyName) &&
+            Arrays.equals(digest, key.digest) &&
+            Objects.equals(reqId, key.reqId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = Objects.hash(reqId);
+
+        res = 31 * res + Arrays.hashCode(encKeyName);
+        res = 31 * res + Arrays.hashCode(digest);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 35;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MasterKeyChangeRequest.class, this);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java
index 3a161664a54..a12210c29cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java
@@ -17,46 +17,52 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
-import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.distributed.DistributedProcess;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Snapshot operation start request for {@link DistributedProcess} initiate 
message.
  */
-abstract class AbstractSnapshotOperationRequest implements Serializable {
-    /** Serial version uid. */
-    private static final long serialVersionUID = 0L;
-
+abstract class AbstractSnapshotOperationRequest implements Message {
     /** Request ID. */
-    @GridToStringInclude
-    private final UUID reqId;
+    @Order(0)
+    UUID reqId;
 
     /** Snapshot name. */
-    @GridToStringInclude
-    private final String snpName;
+    @Order(1)
+    String snpName;
 
     /** Snapshot directory path. */
-    @GridToStringInclude
-    private final String snpPath;
+    @Order(2)
+    String snpPath;
 
     /** Collection of cache group names. */
+    @Order(3)
     @GridToStringInclude
-    private final Collection<String> grps;
+    Collection<String> grps;
 
     /** Start time. */
-    @GridToStringInclude
-    private final long startTime;
+    @Order(4)
+    long startTime;
 
     /** IDs of the nodes that must be alive to complete the operation. */
     @GridToStringInclude
-    private final Set<UUID> nodes;
+    @Order(5)
+    Set<UUID> nodes;
+
+    /** Default constructor for {@link MessageFactory}. */
+    public AbstractSnapshotOperationRequest() {
+        // No-op.
+    }
 
     /**
      * @param reqId Request ID.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 7445cb07582..849e87175e3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -4123,41 +4123,6 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
         }
     }
 
-    /** Snapshot operation start message. */
-    private static class SnapshotStartDiscoveryMessage extends 
InitMessage<SnapshotOperationRequest>
-        implements SnapshotDiscoveryMessage {
-        /** Serial version UID. */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final boolean needExchange;
-
-        /**
-         * @param procId Unique process id.
-         * @param req Snapshot initial request.
-         */
-        public SnapshotStartDiscoveryMessage(UUID procId, 
SnapshotOperationRequest req) {
-            super(procId, START_SNAPSHOT, req, req.incremental());
-
-            needExchange = !req.incremental();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean needExchange() {
-            return needExchange;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean needAssignPartitions() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(SnapshotStartDiscoveryMessage.class, this, 
super.toString());
-        }
-    }
-
     /** */
     public static class ClusterSnapshotFuture extends GridFutureAdapter<Void> {
         /** Unique snapshot request id. */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
index ec646f25420..155c690a03d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
@@ -19,9 +19,11 @@ package 
org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.util.Collection;
 import java.util.UUID;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -30,23 +32,28 @@ import org.jetbrains.annotations.Nullable;
  * @see SnapshotCheckProcess
  */
 public class SnapshotCheckProcessRequest extends 
AbstractSnapshotOperationRequest {
-    /** Serial version uid. */
-    private static final long serialVersionUID = 0L;
-
     /** If {@code true}, additionally calculates partition hashes. Otherwise, 
checks only snapshot integrity and partition counters. */
     @GridToStringInclude
-    private final boolean fullCheck;
+    @Order(0)
+    boolean fullCheck;
 
     /**
      * If {@code true}, all the registered {@link 
IgniteSnapshotManager#handlers()} of type {@link SnapshotHandlerType#RESTORE}
      * are invoked. Otherwise, only snapshot metadatas and partition hashes 
are validated.
      */
     @GridToStringInclude
-    private final boolean allRestoreHandlers;
+    @Order(1)
+    boolean allRestoreHandlers;
 
     /** Incremental snapshot index. If not positive, snapshot is not 
considered as incremental. */
     @GridToStringInclude
-    private final int incIdx;
+    @Order(2)
+    int incIdx;
+
+    /** Default constructor for {@link MessageFactory}. */
+    public SnapshotCheckProcessRequest() {
+        // No-op.
+    }
 
     /**
      * Creates snapshot check process request.
@@ -100,6 +107,11 @@ public class SnapshotCheckProcessRequest extends 
AbstractSnapshotOperationReques
         return incIdx;
     }
 
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 33;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SnapshotCheckProcessRequest.class, this, 
super.toString());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationEndRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationEndRequest.java
index ce13f905857..c826b20f5f3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationEndRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationEndRequest.java
@@ -17,33 +17,41 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
-import java.io.Serializable;
 import java.util.List;
 import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
 import 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Snapshot operation end request for {@link 
DistributedProcessType#END_SNAPSHOT} initiate message.
  */
-public class SnapshotOperationEndRequest implements Serializable {
-    /** Serial version uid. */
-    private static final long serialVersionUID = 0L;
-
+public class SnapshotOperationEndRequest implements Message {
     /** Request ID. */
     @GridToStringInclude
-    private final UUID reqId;
+    @Order(0)
+    UUID reqId;
 
     /** Exception occurred during snapshot operation processing. */
-    @Nullable private final Throwable err;
+    @Order(1)
+    @Nullable ErrorMessage err;
 
     /**
      * Snapshot operation warnings. Warnings do not interrupt snapshot process 
but raise exception at the end to make
      * the operation status 'not OK' if no other error occurred.
      */
-    @Nullable private final List<String> warnings;
+    @Order(2)
+    @Nullable List<String> warnings;
+
+    /** Default constructor for {@link MessageFactory}. */
+    public SnapshotOperationEndRequest() {
+        // No-op.
+    }
 
     /**
      * @param id Request ID.
@@ -52,7 +60,7 @@ public class SnapshotOperationEndRequest implements 
Serializable {
      */
     public SnapshotOperationEndRequest(UUID id, @Nullable Throwable err, 
@Nullable List<String> warnings) {
         reqId = id;
-        this.err = err;
+        this.err = new ErrorMessage(err);
         this.warnings = warnings;
     }
 
@@ -63,7 +71,7 @@ public class SnapshotOperationEndRequest implements 
Serializable {
 
     /** @return Exception occurred during snapshot operation processing. */
     @Nullable public Throwable error() {
-        return err;
+        return ErrorMessage.error(err);
     }
 
     /** @return Warnings of snapshot operation. */
@@ -71,6 +79,11 @@ public class SnapshotOperationEndRequest implements 
Serializable {
         return warnings;
     }
 
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 36;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SnapshotOperationEndRequest.class, this, 
super.toString());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
index e4a04bec8a5..b6735591320 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
@@ -20,40 +20,52 @@ package 
org.apache.ignite.internal.processors.cache.persistence.snapshot;
 import java.util.Collection;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.distributed.DistributedProcess;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Snapshot operation end request for {@link 
DistributedProcess.DistributedProcessType#START_SNAPSHOT} initiate message.
  */
 public class SnapshotOperationRequest extends AbstractSnapshotOperationRequest 
{
-    /** Serial version uid. */
-    private static final long serialVersionUID = 0L;
-
     /** Operational node ID. */
-    private final UUID opNodeId;
+    @Order(0)
+    UUID opNodeId;
 
     /** If {@code true} then incremental snapshot requested. */
-    private final boolean incremental;
+    @Order(1)
+    boolean incremental;
 
     /** Index of incremental snapshot. */
-    private final int incIdx;
+    @Order(2)
+    int incIdx;
 
     /** If {@code true} snapshot only primary copies of partitions. */
-    private final boolean onlyPrimary;
+    @Order(3)
+    boolean onlyPrimary;
 
     /** If {@code true} then create dump. */
-    private final boolean dump;
+    @Order(4)
+    boolean dump;
 
     /** If {@code true} then compress partition files. */
-    private final boolean compress;
+    @Order(5)
+    boolean compress;
 
     /** If {@code true} then content of dump encrypted. */
-    private final boolean encrypt;
+    @Order(6)
+    boolean encrypt;
 
     /** If {@code true} then only cache config and metadata included in 
snapshot. */
-    private final boolean configOnly;
+    @Order(7)
+    boolean configOnly;
+
+    /** Default constructor for {@link MessageFactory}. */
+    public SnapshotOperationRequest() {
+        // No-op.
+    }
 
     /**
      * @param reqId Request ID.
@@ -139,8 +151,13 @@ public class SnapshotOperationRequest extends 
AbstractSnapshotOperationRequest {
         return configOnly;
     }
 
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 34;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(SnapshotOperationRequest.class, this);
+        return S.toString(SnapshotOperationRequest.class, this, 
super.toString());
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 84d12d3b24c..036dcdc1312 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -137,19 +137,19 @@ public class SnapshotRestoreProcess {
     private final DistributedProcess<SnapshotOperationRequest, 
SnapshotRestoreOperationResponse> prepareRestoreProc;
 
     /** Cache group restore preload partitions phase. */
-    private final DistributedProcess<UUID, Message> preloadProc;
+    private final DistributedProcess<SnapshotRestoreStartRequest, Message> 
preloadProc;
 
     /** Cache group restore cache start phase. */
-    private final DistributedProcess<UUID, Message> cacheStartProc;
+    private final DistributedProcess<SnapshotRestoreStartRequest, Message> 
cacheStartProc;
 
     /** Cache group restore cache stop phase. */
-    private final DistributedProcess<UUID, Message> cacheStopProc;
+    private final DistributedProcess<SnapshotRestoreStartRequest, Message> 
cacheStopProc;
 
     /** Incremental snapshot restore phase. */
-    private final DistributedProcess<UUID, Message> incSnpRestoreProc;
+    private final DistributedProcess<SnapshotRestoreStartRequest, Message> 
incSnpRestoreProc;
 
     /** Cache group restore rollback phase. */
-    private final DistributedProcess<UUID, Message> rollbackRestoreProc;
+    private final DistributedProcess<SnapshotRestoreStartRequest, Message> 
rollbackRestoreProc;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -861,7 +861,7 @@ public class SnapshotRestoreProcess {
         opCtx0.cfgs = globalCfgs;
 
         if (U.isLocalNodeCoordinator(ctx.discovery()))
-            preloadProc.start(reqId, reqId);
+            preloadProc.start(reqId, new SnapshotRestoreStartRequest(reqId));
     }
 
     /**
@@ -909,18 +909,20 @@ public class SnapshotRestoreProcess {
     }
 
     /**
-     * @param reqId Request id.
+     * @param req Request.
      * @return Future which will be completed when the preload ends.
      */
-    private IgniteInternalFuture<Message> preload(UUID reqId) {
+    private IgniteInternalFuture<Message> preload(SnapshotRestoreStartRequest 
req) {
         if (ctx.clientNode())
             return new GridFinishedFuture<>();
 
         SnapshotRestoreContext opCtx0 = opCtx;
         GridFutureAdapter<Message> retFut = new GridFutureAdapter<>();
 
-        if (opCtx0 == null)
-            return new GridFinishedFuture<>(new 
IgniteCheckedException("Snapshot restore process has incorrect restore state: " 
+ reqId));
+        if (opCtx0 == null) {
+            return new GridFinishedFuture<>(new IgniteCheckedException(
+                "Snapshot restore process has incorrect restore state: " + 
req.requestId()));
+        }
 
         if (opCtx0.dirs.isEmpty())
             return new GridFinishedFuture<>();
@@ -940,7 +942,7 @@ public class SnapshotRestoreProcess {
 
             if (log.isInfoEnabled()) {
                 log.info("Starting snapshot preload operation to restore cache 
groups " +
-                    "[reqId=" + reqId +
+                    "[reqId=" + req.requestId() +
                     ", snapshot=" + opCtx0.snpName +
                     ", caches=" + F.transform(opCtx0.dirs.values(), s -> 
NodeFileTree.cacheName(s.get(0))) + ']');
             }
@@ -1057,7 +1059,7 @@ public class SnapshotRestoreProcess {
 
                         if (log.isInfoEnabled()) {
                             log.info("The snapshot was taken on the same 
cluster topology. The index will be copied to " +
-                                "restoring cache group if necessary [reqId=" + 
reqId + ", snapshot=" + opCtx0.snpName +
+                                "restoring cache group if necessary [reqId=" + 
req.requestId() + ", snapshot=" + opCtx0.snpName +
                                 ", dir=" + e.getValue().get(0).getName() + 
']');
                         }
 
@@ -1093,7 +1095,7 @@ public class SnapshotRestoreProcess {
                 for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m : 
snpAff.entrySet()) {
                     if (log.isInfoEnabled()) {
                         log.info("Trying to request partitions from remote 
node " +
-                            "[reqId=" + reqId +
+                            "[reqId=" + req.requestId() +
                             ", snapshot=" + opCtx0.snpName +
                             ", nodeId=" + m.getKey() +
                             ", grpParts=" + 
partitionsMapToString(m.getValue(), cacheGrpNames) + "]");
@@ -1248,20 +1250,20 @@ public class SnapshotRestoreProcess {
 
         if (failure != null) {
             if (U.isLocalNodeCoordinator(ctx.discovery()))
-                rollbackRestoreProc.start(reqId, reqId);
+                rollbackRestoreProc.start(reqId, new 
SnapshotRestoreStartRequest(reqId));
 
             return;
         }
 
         if (U.isLocalNodeCoordinator(ctx.discovery()))
-            cacheStartProc.start(reqId, reqId);
+            cacheStartProc.start(reqId, new 
SnapshotRestoreStartRequest(reqId));
     }
 
     /**
-     * @param reqId Request ID.
+     * @param req Request.
      * @return Result future.
      */
-    private IgniteInternalFuture<Message> cacheStart(UUID reqId) {
+    private IgniteInternalFuture<Message> 
cacheStart(SnapshotRestoreStartRequest req) {
         if (ctx.clientNode())
             return new GridFinishedFuture<>();
 
@@ -1285,7 +1287,7 @@ public class SnapshotRestoreProcess {
 
         // We set the topology node IDs required to successfully start the 
cache, if any of the required nodes leave
         // the cluster during the cache startup, the whole procedure will be 
rolled back.
-        return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, 
false, IgniteUuid.fromUuid(reqId))
+        return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, 
false, IgniteUuid.fromUuid(req.requestId()))
             .chain(fut -> {
                 if (fut.error() != null)
                     throw F.wrap(fut.error());
@@ -1311,7 +1313,7 @@ public class SnapshotRestoreProcess {
         if (failure == null) {
             if (opCtx0.incIdx > 0) {
                 if (U.isLocalNodeCoordinator(ctx.discovery()))
-                    incSnpRestoreProc.start(reqId, reqId);
+                    incSnpRestoreProc.start(reqId, new 
SnapshotRestoreStartRequest(reqId));
 
                 return;
             }
@@ -1324,18 +1326,18 @@ public class SnapshotRestoreProcess {
         opCtx0.err.compareAndSet(null, failure);
 
         if (U.isLocalNodeCoordinator(ctx.discovery()))
-            cacheStopProc.start(reqId, reqId);
+            cacheStopProc.start(reqId, new SnapshotRestoreStartRequest(reqId));
     }
 
     /**
-     * @param reqId Request ID.
+     * @param req Request.
      * @return Result future.
      */
-    private IgniteInternalFuture<Message> cacheStop(UUID reqId) {
+    private IgniteInternalFuture<Message> 
cacheStop(SnapshotRestoreStartRequest req) {
         if (!U.isLocalNodeCoordinator(ctx.discovery()))
             return new GridFinishedFuture<>();
 
-        assert opCtx.reqId == reqId;
+        assert opCtx.reqId == req.requestId();
 
         SnapshotRestoreContext opCtx0 = opCtx;
 
@@ -1345,7 +1347,7 @@ public class SnapshotRestoreProcess {
             .collect(Collectors.toSet());
 
         if (log.isInfoEnabled())
-            log.info("Stopping caches [reqId=" + reqId + ", caches=" + 
stopCaches + ']');
+            log.info("Stopping caches [reqId=" + req.requestId() + ", caches=" 
+ stopCaches + ']');
 
         // Skip deleting cache files as they will be removed during rollback.
         return ctx.cache().dynamicDestroyCaches(stopCaches, false, false)
@@ -1373,16 +1375,16 @@ public class SnapshotRestoreProcess {
         }
 
         if (U.isLocalNodeCoordinator(ctx.discovery()))
-            rollbackRestoreProc.start(reqId, reqId);
+            rollbackRestoreProc.start(reqId, new 
SnapshotRestoreStartRequest(reqId));
     }
 
     /**
      * Inits restoring incremental snapshot.
      *
-     * @param reqId Request ID.
+     * @param req Request ID.
      * @return Result future.
      */
-    private IgniteInternalFuture<Message> incrementalSnapshotRestore(UUID 
reqId) {
+    private IgniteInternalFuture<Message> incrementalSnapshotRestore(Message 
req) {
         SnapshotRestoreContext opCtx0 = opCtx;
 
         if (ctx.clientNode() || opCtx0 == null || 
!opCtx0.nodes().contains(ctx.localNodeId()))
@@ -1575,7 +1577,7 @@ public class SnapshotRestoreProcess {
         opCtx0.err.compareAndSet(null, failure);
 
         if (U.isLocalNodeCoordinator(ctx.discovery()))
-            cacheStopProc.start(reqId, reqId);
+            cacheStopProc.start(reqId, new SnapshotRestoreStartRequest(reqId));
     }
 
     /**
@@ -1647,10 +1649,10 @@ public class SnapshotRestoreProcess {
     }
 
     /**
-     * @param reqId Request ID.
+     * @param req Request.
      * @return Result future.
      */
-    private IgniteInternalFuture<Message> rollback(UUID reqId) {
+    private IgniteInternalFuture<Message> rollback(SnapshotRestoreStartRequest 
req) {
         if (ctx.clientNode())
             return new GridFinishedFuture<>();
 
@@ -1668,7 +1670,7 @@ public class SnapshotRestoreProcess {
         try {
             
ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
                 if (log.isInfoEnabled()) {
-                    log.info("Removing restored cache directories [reqId=" + 
reqId +
+                    log.info("Removing restored cache directories [reqId=" + 
req.requestId() +
                         ", snapshot=" + opCtx0.snpName + ", dirs=" + 
opCtx0.dirs.values() + ']');
                 }
 
@@ -1680,14 +1682,14 @@ public class SnapshotRestoreProcess {
 
                         if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
                             log.error("Unable to perform rollback routine 
completely, cannot remove temp directory " +
-                                "[reqId=" + reqId + ", snapshot=" + 
opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
+                                "[reqId=" + req.requestId() + ", snapshot=" + 
opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
 
                             ex = new IgniteCheckedException("Unable to remove 
temporary cache directory " + cacheDir);
                         }
 
                         if (cacheDir.exists() && !U.delete(cacheDir)) {
                             log.error("Unable to perform rollback routine 
completely, cannot remove cache directory " +
-                                "[reqId=" + reqId + ", snapshot=" + 
opCtx0.snpName + ", dir=" + cacheDir + ']');
+                                "[reqId=" + req.requestId() + ", snapshot=" + 
opCtx0.snpName + ", dir=" + cacheDir + ']');
 
                             ex = new IgniteCheckedException("Unable to remove 
cache directory " + cacheDir);
                         }
@@ -1702,7 +1704,7 @@ public class SnapshotRestoreProcess {
         }
         catch (RejectedExecutionException e) {
             log.error("Unable to perform rollback routine, task has been 
rejected " +
-                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']');
+                "[reqId=" + req.requestId() + ", snapshot=" + opCtx0.snpName + 
']');
 
             retFut.onDone(e);
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStartRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStartRequest.java
new file mode 100644
index 00000000000..b2e10f0ac7b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStartRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+
+/** */
+public class SnapshotRestoreStartRequest implements Message {
+    /** Request id. */
+    @Order(0)
+    UUID reqId;
+
+    /** Default constructor for {@link MessageFactory}. */
+    public SnapshotRestoreStartRequest() {
+        // No-op.
+    }
+
+    /** */
+    public SnapshotRestoreStartRequest(UUID reqId) {
+        this.reqId = reqId;
+    }
+
+    /** @return Request ID. */
+    public UUID requestId() {
+        return reqId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 37;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SnapshotRestoreStartRequest.class, this);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotStartDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotStartDiscoveryMessage.java
new file mode 100644
index 00000000000..9c3ca28d73e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotStartDiscoveryMessage.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.START_SNAPSHOT;
+
+/** Snapshot operation start message. */
+public class SnapshotStartDiscoveryMessage extends 
InitMessage<SnapshotOperationRequest> implements SnapshotDiscoveryMessage {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @Order(0)
+    boolean needExchange;
+
+    /** Default constructor for {@link MessageFactory}. */
+    public SnapshotStartDiscoveryMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param procId Unique process id.
+     * @param req Snapshot initial request.
+     */
+    public SnapshotStartDiscoveryMessage(UUID procId, SnapshotOperationRequest 
req) {
+        super(procId, START_SNAPSHOT, req, req.incremental());
+
+        needExchange = !req.incremental();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean needExchange() {
+        return needExchange;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean needAssignPartitions() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 32;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SnapshotStartDiscoveryMessage.class, this, 
super.toString());
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
index 593969d74e5..99467083712 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.performancestatistics;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EventListener;
@@ -76,7 +75,7 @@ public class PerformanceStatisticsProcessor extends 
GridProcessorAdapter {
     private final ArrayList<PerformanceStatisticsStateListener> lsnrs = new 
ArrayList<>();
 
     /** Rotate performance statistics process. */
-    private DistributedProcess<Serializable, Message> rotateProc;
+    private DistributedProcess<Message, Message> rotateProc;
 
     /** Whether performance statistics collection is running. */
     private volatile boolean isRunning;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
index ee0a93945a5..73298b6696e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.util.distributed;
 
-import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -72,7 +71,7 @@ import static 
org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id;
  * @see InitMessage
  * @see FullMessage
  */
-public class DistributedProcess<I extends Serializable, R extends Message> {
+public class DistributedProcess<I extends Message, R extends Message> {
     /** Process type. */
     private final DistributedProcessType type;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
index f98e2715aae..8eec7bf33b8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.util.distributed;
 
-import java.io.Serializable;
 import java.util.UUID;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -26,6 +26,8 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -36,24 +38,34 @@ import org.jetbrains.annotations.Nullable;
  * @see FullMessage
  * @see SingleNodeMessage
  */
-public class InitMessage<I extends Serializable> implements 
DiscoveryCustomMessage {
+public class InitMessage<I extends Message> implements Message, 
DiscoveryCustomMessage {
     /** Serial version uid. */
     private static final long serialVersionUID = 0L;
 
     /** Custom message ID. */
-    private final IgniteUuid id = IgniteUuid.randomUuid();
+    @Order(0)
+    public IgniteUuid id;
 
     /** Process id. */
-    private final UUID procId;
+    @Order(1)
+    public UUID procId;
 
     /** Process type. */
-    private final int type;
+    @Order(2)
+    public int type;
 
     /** Request. */
-    private final I req;
+    @Order(3)
+    public Message req;
 
     /** Whether coordinator waits client nodes results. */
-    private final boolean waitClnRes;
+    @Order(4)
+    public boolean waitClnRes;
+
+    /** Default constructor for {@link MessageFactory}. */
+    public InitMessage() {
+        // No-op.
+    }
 
     /**
      * @param procId Process id.
@@ -61,6 +73,7 @@ public class InitMessage<I extends Serializable> implements 
DiscoveryCustomMessa
      * @param req Request.
      */
     public InitMessage(UUID procId, DistributedProcessType type, I req, 
boolean waitClnRes) {
+        this.id = IgniteUuid.randomUuid();
         this.procId = procId;
         this.type = type.ordinal();
         this.req = req;
@@ -95,7 +108,7 @@ public class InitMessage<I extends Serializable> implements 
DiscoveryCustomMessa
 
     /** @return Request. */
     public I request() {
-        return req;
+        return (I)req;
     }
 
     /** @return Whether coordinator waits client nodes results. */
@@ -107,4 +120,9 @@ public class InitMessage<I extends Serializable> implements 
DiscoveryCustomMessa
     @Override public String toString() {
         return S.toString(InitMessage.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 31;
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupKeyChangeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupKeyChangeTest.java
index 0e08ddd281f..f8d336b395c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupKeyChangeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupKeyChangeTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.encryption;
 
-import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -55,6 +54,7 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook;
@@ -1029,7 +1029,7 @@ public class CacheGroupKeyChangeTest extends 
AbstractEncryptionTest {
             if (!(customMsg instanceof InitMessage))
                 return;
 
-            InitMessage<Serializable> msg = 
(InitMessage<Serializable>)customMsg;
+            InitMessage<Message> msg = (InitMessage<Message>)customMsg;
 
             if (msg.type() != type.ordinal())
                 return;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
index d002ea6466a..f3edd6df687 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
@@ -617,7 +617,7 @@ public class IgniteSnapshotManagerSelfTest extends 
AbstractSnapshotSelfTest {
         LogListener matchFinish = LogListener.matches("Cluster-wide snapshot 
operation finished successfully: ").times(entriesCnt).build();
         listenLog.registerListener(matchFinish);
 
-        LogListener matchFullParams = LogListener.matches("incremental=false, 
incIdx=-1").times(4).build();
+        LogListener matchFullParams = LogListener.matches("incremental=false, 
incIdx=-1").times(5).build();
         listenLog.registerListener(matchFullParams);
 
         LogListener matchIncParams = 
LogListener.matches("incremental=true").times(4 * (entriesCnt - 1)).build();
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java
index 18bbf5a6d2f..5fc16e6534b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java
@@ -104,7 +104,7 @@ public class DistributedProcessClientAwaitTest extends 
GridCommonAbstractTest {
     public void testSkipWaitingFailedClient() throws Exception {
         finishLatchRef.set(new CountDownLatch(NODES_CNT));
 
-        List<DistributedProcess<Integer, Message>> processes = new 
ArrayList<>(NODES_CNT + 1);
+        List<DistributedProcess<Message, Message>> processes = new 
ArrayList<>(NODES_CNT + 1);
 
         TestRecordingCommunicationSpi clnCommSpi = 
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
         clnCommSpi.blockMessages((node, msg) -> msg instanceof 
SingleNodeMessage);
@@ -115,13 +115,13 @@ public class DistributedProcessClientAwaitTest extends 
GridCommonAbstractTest {
             nodeIdsRes.add(grid(i).localNode().id());
 
         for (int n = 0; n < NODES_CNT + 1; n++) {
-            DistributedProcess<Integer, Message> dp = new 
TestDistributedProcess(
+            DistributedProcess<Message, Message> dp = new 
TestDistributedProcess(
                 nodeIdsRes, grid(n).context(), (id, req) -> new 
InitMessage<>(id, TEST_PROCESS, req, true));
 
             processes.add(dp);
         }
 
-        processes.get(0).start(UUID.randomUUID(), 0);
+        processes.get(0).start(UUID.randomUUID(), null);
 
         clnCommSpi.waitForBlocked();
 
@@ -137,7 +137,7 @@ public class DistributedProcessClientAwaitTest extends 
GridCommonAbstractTest {
     public void testChangedCoordinatorAwaitsClientResult() throws Exception {
         finishLatchRef.set(new CountDownLatch(NODES_CNT));
 
-        List<DistributedProcess<Integer, Message>> processes = new 
ArrayList<>(NODES_CNT + 1);
+        List<DistributedProcess<Message, Message>> processes = new 
ArrayList<>(NODES_CNT + 1);
 
         TestRecordingCommunicationSpi clnCommSpi = 
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
         clnCommSpi.blockMessages((node, msg) -> msg instanceof 
SingleNodeMessage);
@@ -148,13 +148,13 @@ public class DistributedProcessClientAwaitTest extends 
GridCommonAbstractTest {
             nodeIdsRes.add(grid(i).localNode().id());
 
         for (int n = 0; n < NODES_CNT + 1; n++) {
-            DistributedProcess<Integer, Message> dp = new 
TestDistributedProcess(
+            DistributedProcess<Message, Message> dp = new 
TestDistributedProcess(
                 nodeIdsRes, grid(n).context(), (id, req) -> new 
InitMessage<>(id, TEST_PROCESS, req, true));
 
             processes.add(dp);
         }
 
-        processes.get(0).start(UUID.randomUUID(), 0);
+        processes.get(0).start(UUID.randomUUID(), null);
 
         clnCommSpi.waitForBlocked();
 
@@ -172,12 +172,12 @@ public class DistributedProcessClientAwaitTest extends 
GridCommonAbstractTest {
     /** */
     private void checkExpectedResults(
         Set<UUID> expNodeIdRes,
-        BiFunction<UUID, Integer, ? extends InitMessage<Integer>> 
initMsgFactory
+        BiFunction<UUID, Message, ? extends InitMessage<Message>> 
initMsgFactory
     ) throws Exception {
-        List<DistributedProcess<Integer, Message>> processes = new 
ArrayList<>(NODES_CNT + 1);
+        List<DistributedProcess<Message, Message>> processes = new 
ArrayList<>(NODES_CNT + 1);
 
         for (int n = 0; n < NODES_CNT + 1; n++) {
-            DistributedProcess<Integer, Message> dp = new 
TestDistributedProcess(
+            DistributedProcess<Message, Message> dp = new 
TestDistributedProcess(
                 expNodeIdRes, grid(n).context(), initMsgFactory);
 
             processes.add(dp);
@@ -187,7 +187,7 @@ public class DistributedProcessClientAwaitTest extends 
GridCommonAbstractTest {
             failRef.set(null);
             finishLatchRef.set(new CountDownLatch(NODES_CNT + 1));
 
-            processes.get(n).start(UUID.randomUUID(), 0);
+            processes.get(n).start(UUID.randomUUID(), null);
 
             finishLatchRef.get().await(getTestTimeout(), MILLISECONDS);
 
@@ -196,12 +196,12 @@ public class DistributedProcessClientAwaitTest extends 
GridCommonAbstractTest {
     }
 
     /** */
-    private static class TestDistributedProcess extends 
DistributedProcess<Integer, Message> {
+    private static class TestDistributedProcess extends 
DistributedProcess<Message, Message> {
         /** */
         public TestDistributedProcess(
             Set<UUID> expNodeIdsRes,
             GridKernalContext ctx,
-            BiFunction<UUID, Integer, ? extends InitMessage<Integer>> 
initMsgFactory
+            BiFunction<UUID, Message, ? extends InitMessage<Message>> 
initMsgFactory
         ) {
             super(
                 ctx,
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java
index 13203ae8bee..e907d130bc0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java
@@ -112,12 +112,12 @@ public class DistributedProcessCoordinatorLeftTest 
extends GridCommonAbstractTes
         CountDownLatch startLatch = new CountDownLatch(NODES_CNT);
         CountDownLatch finishLatch = new CountDownLatch(NODES_CNT - 1);
 
-        HashMap<String, DistributedProcess<Integer, TestIntegerMessage>> 
processes = new HashMap<>();
+        HashMap<String, DistributedProcess<TestIntegerMessage, 
TestIntegerMessage>> processes = new HashMap<>();
 
         int procRes = 1;
 
         for (Ignite grid : G.allGrids()) {
-            DistributedProcess<Integer, TestIntegerMessage> dp = new 
DistributedProcess<>(((IgniteEx)grid).context(),
+            DistributedProcess<TestIntegerMessage, TestIntegerMessage> dp = 
new DistributedProcess<>(((IgniteEx)grid).context(),
                 TEST_PROCESS,
                 req -> {
                     IgniteInternalFuture<TestIntegerMessage> fut = runAsync(() 
-> {
@@ -128,7 +128,7 @@ public class DistributedProcessCoordinatorLeftTest extends 
GridCommonAbstractTes
                             fail("Unexpected interrupt.");
                         }
 
-                        return new TestIntegerMessage(req);
+                        return new TestIntegerMessage(req.value());
                     });
 
                     // A single message will be sent before this latch 
released.
@@ -151,7 +151,7 @@ public class DistributedProcessCoordinatorLeftTest extends 
GridCommonAbstractTes
             processes.put(grid.name(), dp);
         }
 
-        processes.get(grid(STOP_NODE_IDX).name()).start(UUID.randomUUID(), 
procRes);
+        processes.get(grid(STOP_NODE_IDX).name()).start(UUID.randomUUID(), new 
TestIntegerMessage(procRes));
 
         assertTrue(startLatch.await(TIMEOUT, MILLISECONDS));
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessErrorHandlingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessErrorHandlingTest.java
index 7254c47cdb4..6ffb7cc7988 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessErrorHandlingTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessErrorHandlingTest.java
@@ -137,16 +137,16 @@ public class DistributedProcessErrorHandlingTest extends 
GridCommonAbstractTest
 
     /** */
     private void checkDistributedProcess(
-        BiFunction<IgniteEx, CountDownLatch, DistributedProcess<Integer, 
Message>> processFactory
+        BiFunction<IgniteEx, CountDownLatch, DistributedProcess<Message, 
Message>> processFactory
     ) throws Exception {
-        DistributedProcess<Integer, Message> proc = null;
+        DistributedProcess<Message, Message> proc = null;
 
         CountDownLatch latch = new CountDownLatch(SRV_NODES + 1);
 
         for (Ignite g: G.allGrids())
             proc = processFactory.apply((IgniteEx)g, latch);
 
-        proc.start(UUID.randomUUID(), 0);
+        proc.start(UUID.randomUUID(), null);
 
         assertTrue(latch.await(5, TimeUnit.SECONDS));
 

Reply via email to