This is an automated email from the ASF dual-hosted git repository.

anton-vinogradov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 14e949855d9 IGNITE-28334 Replace CalciteMarshalableMessage with 
MarshallableMessage (#13022)
14e949855d9 is described below

commit 14e949855d99631eaa7e05782b599d28fbca2d22
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Apr 20 19:23:29 2026 +0300

    IGNITE-28334 Replace CalciteMarshalableMessage with MarshallableMessage 
(#13022)
---
 .../query/calcite/exec/ExchangeServiceImpl.java    | 10 +--
 ...java => CalciteContextMarshallableMessage.java} | 10 +--
 .../query/calcite/message/CalciteErrorMessage.java | 54 ++-----------
 .../query/calcite/message/CalciteMessage.java      | 30 -------
 .../calcite/message/CalciteMessageFactory.java     | 25 +++---
 .../calcite/message/ExecutionContextAware.java     |  3 +-
 .../query/calcite/message/GenericValueMessage.java | 28 +++----
 .../query/calcite/message/MessageListener.java     |  3 +-
 .../query/calcite/message/MessageService.java      |  3 +-
 .../query/calcite/message/MessageServiceImpl.java  | 35 ++++----
 .../query/calcite/message/MessageType.java         | 69 +++++++---------
 .../message/QueryBatchAcknowledgeMessage.java      |  5 --
 .../query/calcite/message/QueryBatchMessage.java   | 60 ++------------
 .../query/calcite/message/QueryCloseMessage.java   |  8 +-
 ...oseMessage.java => QueryInboxCloseMessage.java} | 12 +--
 .../query/calcite/message/QueryStartRequest.java   | 35 ++++----
 .../query/calcite/message/QueryStartResponse.java  |  8 +-
 .../query/calcite/message/QueryTxEntry.java        | 15 ++--
 .../query/calcite/message/ValueMessage.java        | 39 ---------
 .../query/calcite/metadata/ColocationGroup.java    | 17 ++--
 .../calcite/metadata/FragmentDescription.java      | 33 +++-----
 .../query/calcite/metadata/FragmentMapping.java    | 25 +-----
 .../calcite/exec/rel/AbstractExecutionTest.java    | 13 +--
 .../calcite/exec/rel/ContinuousExecutionTest.java  |  6 ++
 .../query/calcite/message/TestIoManager.java       |  3 +-
 .../query/calcite/planner/AbstractPlannerTest.java | 13 +--
 .../query/calcite/planner/PlanExecutionTest.java   | 11 +++
 .../ignite/internal/CoreMessagesProvider.java      | 88 +++++++-------------
 .../org/apache/ignite/internal/IgniteKernal.java   | 16 ++--
 .../managers/communication/GridIoManager.java      |  9 +--
 ...AbstractMarshallableMessageFactoryProvider.java | 93 ++++++++++++++++++++++
 .../direct/DirectMarshallingMessagesTest.java      |  2 +-
 .../IgniteCoreMessagesSerializationTest.java       |  2 +-
 .../communication/CompressedMessageTest.java       |  2 +-
 ...niteCacheContinuousQueryImmutableEntryTest.java |  2 +-
 .../GridAbstractCommunicationSelfTest.java         |  2 +-
 ...pCommunicationSpiConcurrentConnectSelfTest.java |  2 +-
 .../tcp/GridTcpCommunicationSpiConfigSelfTest.java |  2 +-
 ...idTcpCommunicationSpiMultithreadedSelfTest.java |  2 +-
 ...GridTcpCommunicationSpiRecoveryAckSelfTest.java |  2 +-
 .../GridTcpCommunicationSpiRecoverySelfTest.java   |  2 +-
 ...TcpCommunicationRecoveryAckClosureSelfTest.java |  2 +-
 .../spi/discovery/tcp/TestTcpDiscoverySpi.java     |  2 +-
 .../ignite/testframework/GridSpiTestContext.java   |  2 +-
 .../ignite/testframework/junits/IgniteMock.java    |  3 +-
 45 files changed, 318 insertions(+), 490 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 039b14ed9e9..ebf5c85c533 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -32,12 +32,12 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
-import 
org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 import 
org.apache.ignite.internal.processors.query.calcite.message.QueryBatchAcknowledgeMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.QueryCloseMessage;
+import 
org.apache.ignite.internal.processors.query.calcite.message.QueryInboxCloseMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
 import 
org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
@@ -165,7 +165,7 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
 
     /** {@inheritDoc} */
     @Override public void closeInbox(UUID nodeId, UUID qryId, long fragmentId, 
long exchangeId) throws IgniteCheckedException {
-        messageService().send(nodeId, new InboxCloseMessage(qryId, fragmentId, 
exchangeId));
+        messageService().send(nodeId, new QueryInboxCloseMessage(qryId, 
fragmentId, exchangeId));
     }
 
     /** {@inheritDoc} */
@@ -188,8 +188,8 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
 
     /** {@inheritDoc} */
     @Override public void init() {
-        messageService().register((n, m) -> onMessage(n, 
(InboxCloseMessage)m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
-        messageService().register((n, m) -> onMessage(n, 
(QueryBatchAcknowledgeMessage)m), MessageType.QUERY_ACKNOWLEDGE_MESSAGE);
+        messageService().register((n, m) -> onMessage(n, 
(QueryInboxCloseMessage)m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
+        messageService().register((n, m) -> onMessage(n, 
(QueryBatchAcknowledgeMessage)m), MessageType.QUERY_BATCH_ACKNOWLEDGE_MESSAGE);
         messageService().register((n, m) -> onMessage(n, 
(QueryBatchMessage)m), MessageType.QUERY_BATCH_MESSAGE);
         messageService().register((n, m) -> onMessage(n, 
(QueryCloseMessage)m), MessageType.QUERY_CLOSE_MESSAGE);
     }
@@ -221,7 +221,7 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
     }
 
     /** */
-    protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
+    protected void onMessage(UUID nodeId, QueryInboxCloseMessage msg) {
         Collection<Inbox<?>> inboxes = 
mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
 
         if (!F.isEmpty(inboxes)) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMarshalableMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteContextMarshallableMessage.java
similarity index 78%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMarshalableMessage.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteContextMarshallableMessage.java
index b671639ebd6..f6ba42de959 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMarshalableMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteContextMarshallableMessage.java
@@ -19,11 +19,10 @@ package 
org.apache.ignite.internal.processors.query.calcite.message;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
-/**
- *
- */
-public interface CalciteMarshalableMessage extends CalciteMessage {
+/** A Calcite engine related message which requires marshalling with context. 
*/
+public interface CalciteContextMarshallableMessage extends Message {
     /**
      * Prepares the message before sending.
      *
@@ -35,6 +34,7 @@ public interface CalciteMarshalableMessage extends 
CalciteMessage {
      * Prepares the message before processing.
      *
      * @param ctx Cache shared context.
+     * @param clsLdr Class loader.
      */
-    void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws 
IgniteCheckedException;
+    void finishUnmarshal(GridCacheSharedContext<?, ?> ctx, ClassLoader clsLdr) 
throws IgniteCheckedException;
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
index ba20b2526b5..9d62635cc22 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
@@ -18,17 +18,11 @@
 package org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
 
-/**
- *
- */
-public class CalciteErrorMessage implements CalciteMarshalableMessage {
+/** */
+public class CalciteErrorMessage extends ErrorMessage {
     /** */
     @Order(0)
     UUID qryId;
@@ -37,14 +31,6 @@ public class CalciteErrorMessage implements 
CalciteMarshalableMessage {
     @Order(1)
     long fragmentId;
 
-    /** Error bytes. */
-    @Order(2)
-    @GridToStringExclude
-    @Nullable public byte[] errBytes;
-
-    /** Error. */
-    private @Nullable Throwable err;
-
     /** */
     public CalciteErrorMessage() {
         // No-op.
@@ -52,47 +38,21 @@ public class CalciteErrorMessage implements 
CalciteMarshalableMessage {
 
     /** */
     public CalciteErrorMessage(UUID qryId, long fragmentId, Throwable err) {
+        super(err);
+
         assert err != null;
 
         this.qryId = qryId;
         this.fragmentId = fragmentId;
-        this.err = err;
     }
 
-    /**
-     * @return Query ID.
-     */
+    /** @return Query ID. */
     public UUID queryId() {
         return qryId;
     }
 
-    /**
-     * @return Fragment ID.
-     */
+    /** @return Fragment ID. */
     public long fragmentId() {
         return fragmentId;
     }
-
-    /** */
-    public @Nullable Throwable error() {
-        return err;
-    }
-
-    /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.QUERY_ERROR_MESSAGE;
-    }
-
-
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        if (err != null)
-            errBytes = U.marshal(ctx.marshaller(), err);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        if (errBytes != null)
-            err = U.unmarshal(ctx.marshaller(), errBytes, 
U.resolveClassLoader(ctx.cache().context().gridConfig()));
-    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessage.java
deleted file mode 100644
index d23affbb5c3..00000000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessage.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.message;
-
-import org.apache.ignite.plugin.extensions.communication.Message;
-
-/**
- *
- */
-public interface CalciteMessage extends Message {
-    /**
-     * @return Message type.
-     */
-    MessageType type();
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
index 1d707cf0c85..f3a9d6f431d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
@@ -17,28 +17,21 @@
 
 package org.apache.ignite.internal.processors.query.calcite.message;
 
-import java.util.function.Supplier;
+import org.apache.ignite.internal.MarshallableMessage;
+import 
org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 
 /**
  * Message factory.
  */
-public class CalciteMessageFactory implements MessageFactoryProvider {
+public class CalciteMessageFactory extends 
AbstractMarshallableMessageFactoryProvider {
     /** {@inheritDoc} */
-    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override public void registerAll(MessageFactory factory) {
-        for (MessageType type : MessageType.values())
-            factory.register(type.directType(), (Supplier)type.factory(), 
type.serializer());
-    }
-
-    /**
-     * Produces a value message.
-     */
-    public static ValueMessage asMessage(Object val) {
-        if (val == null)
-            return null;
-
-        return new GenericValueMessage(val);
+        for (MessageType type : MessageType.values()) {
+            if 
(MarshallableMessage.class.isAssignableFrom(type.messageClass()))
+                register(factory, type.messageClass(), type.directType(), 
schemaAwareMarsh, resolvedClsLdr);
+            else
+                register(factory, type.messageClass(), type.directType(), 
dfltMarsh, dftlClsLdr);
+        }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java
index 883f504af47..5710a68fc70 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java
@@ -18,11 +18,12 @@
 package org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.util.UUID;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  * Execution context is used to determine a stripe where to process a message.
  */
-public interface ExecutionContextAware extends CalciteMessage {
+public interface ExecutionContextAware extends Message {
     /**
      * @return Query ID.
      */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
index b03659defa9..40849bab6a7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
@@ -18,14 +18,13 @@
 package org.apache.ignite.internal.processors.query.calcite.message;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
 import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
 
-/**
- *
- */
-public final class GenericValueMessage implements ValueMessage {
+/** */
+public final class GenericValueMessage implements MarshallableMessage {
     /** */
     private Object val;
 
@@ -35,7 +34,7 @@ public final class GenericValueMessage implements 
ValueMessage {
 
     /** */
     public GenericValueMessage() {
-
+        // No-op.
     }
 
     /** */
@@ -43,25 +42,22 @@ public final class GenericValueMessage implements 
ValueMessage {
         this.val = val;
     }
 
-    /** {@inheritDoc} */
-    @Override public Object value() {
+    /** */
+    public Object value() {
         return val;
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
+    @Override public void prepareMarshal(Marshaller marsh) throws 
IgniteCheckedException {
         if (val != null && serialized == null)
-            serialized = U.marshal(ctx, val);
+            serialized = U.marshal(marsh, val);
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
+    @Override public void finishUnmarshal(Marshaller marsh, ClassLoader 
clsLdr) throws IgniteCheckedException {
         if (serialized != null && val == null)
-            val = U.unmarshal(ctx, serialized, 
U.resolveClassLoader(ctx.gridConfig()));
-    }
+            val = U.unmarshal(marsh, serialized, clsLdr);
 
-    /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.GENERIC_VALUE_MESSAGE;
+        serialized = null;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java
index 61fc8977157..68e2d2710bd 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.util.UUID;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  *
@@ -27,5 +28,5 @@ public interface MessageListener {
      * @param nodeId Sender node ID.
      * @param msg Message.
      */
-    void onMessage(UUID nodeId, CalciteMessage msg);
+    void onMessage(UUID nodeId, Message msg);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
index 96b2aee7c2f..caa1de990fe 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.message;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  *
@@ -31,7 +32,7 @@ public interface MessageService extends Service {
      * @param nodeId Node ID.
      * @param msg Message.
      */
-    void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
+    void send(UUID nodeId, Message msg) throws IgniteCheckedException;
 
     /**
      * Checks whether a node with given ID is alive.
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
index 2cea51aa5cf..dd421bf1f6c 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
@@ -17,7 +17,8 @@
 
 package org.apache.ignite.internal.processors.query.calcite.message;
 
-import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
@@ -49,6 +50,9 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
     /** */
     private final GridCacheSharedContext<?, ?> ctx;
 
+    /** */
+    private final ClassLoader clsLdr;
+
     /** */
     private UUID localNodeId;
 
@@ -62,14 +66,15 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
     private FailureProcessor failureProcessor;
 
     /** */
-    private EnumMap<MessageType, MessageListener> lsnrs;
+    private Map<Short, MessageListener> lsnrs;
 
     /** */
     public MessageServiceImpl(GridKernalContext ctx) {
         super(ctx);
 
         this.ctx = ctx.cache().context();
-        this.ioManager = ctx.io();
+        clsLdr = U.resolveClassLoader(ctx.config());
+        ioManager = ctx.io();
         msgLsnr = this::onMessage;
     }
 
@@ -146,7 +151,7 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
     }
 
     /** {@inheritDoc} */
-    @Override public void send(UUID nodeId, CalciteMessage msg) throws 
IgniteCheckedException {
+    @Override public void send(UUID nodeId, Message msg) throws 
IgniteCheckedException {
         if (localNodeId().equals(nodeId))
             onMessage(nodeId, msg);
         else {
@@ -159,9 +164,9 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
     /** {@inheritDoc} */
     @Override public void register(MessageListener lsnr, MessageType type) {
         if (lsnrs == null)
-            lsnrs = new EnumMap<>(MessageType.class);
+            lsnrs = new HashMap<>();
 
-        MessageListener old = lsnrs.put(type, lsnr);
+        MessageListener old = lsnrs.put(type.directType(), lsnr);
 
         assert old == null : old;
     }
@@ -179,8 +184,8 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
     /** */
     protected void prepareMarshal(Message msg) throws IgniteCheckedException {
         try {
-            if (msg instanceof CalciteMarshalableMessage)
-                ((CalciteMarshalableMessage)msg).prepareMarshal(ctx);
+            if (msg instanceof CalciteContextMarshallableMessage)
+                ((CalciteContextMarshallableMessage)msg).prepareMarshal(ctx);
         }
         catch (Exception e) {
             failureProcessor().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
@@ -192,8 +197,8 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
     /** */
     protected void prepareUnmarshal(Message msg) throws IgniteCheckedException 
{
         try {
-            if (msg instanceof CalciteMarshalableMessage)
-                ((CalciteMarshalableMessage)msg).prepareUnmarshal(ctx);
+            if (msg instanceof CalciteContextMarshallableMessage)
+                ((CalciteContextMarshallableMessage)msg).finishUnmarshal(ctx, 
clsLdr);
         }
         catch (Exception e) {
             failureProcessor().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
@@ -203,7 +208,7 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
     }
 
     /** */
-    protected void onMessage(UUID nodeId, CalciteMessage msg) {
+    protected void onMessage(UUID nodeId, Message msg) {
         if (msg instanceof ExecutionContextAware) {
             ExecutionContextAware msg0 = (ExecutionContextAware)msg;
             taskExecutor().execute(msg0.queryId(), msg0.fragmentId(), () -> 
onMessageInternal(nodeId, msg));
@@ -218,16 +223,16 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
 
     /** */
     private void onMessage(UUID nodeId, Object msg, byte plc) {
-        if (msg instanceof CalciteMessage)
-            onMessage(nodeId, (CalciteMessage)msg);
+        if (msg instanceof Message && 
MessageType.isCalciteMessage((Message)msg))
+            onMessage(nodeId, (Message)msg);
     }
 
     /** */
-    private void onMessageInternal(UUID nodeId, CalciteMessage msg) {
+    private void onMessageInternal(UUID nodeId, Message msg) {
         try {
             prepareUnmarshal(msg);
 
-            MessageListener lsnr = 
Objects.requireNonNull(lsnrs.get(msg.type()));
+            MessageListener lsnr = 
Objects.requireNonNull(lsnrs.get(msg.directType()));
             lsnr.onMessage(nodeId, msg);
         }
         catch (IgniteCheckedException e) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index d0ce95c54da..045dac80f69 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -17,93 +17,82 @@
 
 package org.apache.ignite.internal.processors.query.calcite.message;
 
-import java.util.function.Supplier;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroupSerializer;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescriptionSerializer;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
-import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingSerializer;
-import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
-/**
- *
- */
+/** */
 public enum MessageType {
     /** */
-    QUERY_START_REQUEST(300, QueryStartRequest::new, new 
QueryStartRequestSerializer()),
+    QUERY_START_REQUEST(300, QueryStartRequest.class),
 
     /** */
-    QUERY_START_RESPONSE(301, QueryStartResponse::new, new 
QueryStartResponseSerializer()),
+    QUERY_START_RESPONSE(301, QueryStartResponse.class),
 
     /** */
-    QUERY_ERROR_MESSAGE(302, CalciteErrorMessage::new, new 
CalciteErrorMessageSerializer()),
+    QUERY_ERROR_MESSAGE(302, CalciteErrorMessage.class),
 
     /** */
-    QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new, new 
QueryBatchMessageSerializer()),
+    QUERY_BATCH_MESSAGE(303, QueryBatchMessage.class),
 
     /** */
-    QUERY_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage::new, new 
QueryBatchAcknowledgeMessageSerializer()),
+    QUERY_BATCH_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage.class),
 
     /** */
-    QUERY_INBOX_CANCEL_MESSAGE(305, InboxCloseMessage::new, new 
InboxCloseMessageSerializer()),
+    QUERY_INBOX_CANCEL_MESSAGE(305, QueryInboxCloseMessage.class),
 
     /** */
-    QUERY_CLOSE_MESSAGE(306, QueryCloseMessage::new, new 
QueryCloseMessageSerializer()),
+    QUERY_CLOSE_MESSAGE(306, QueryCloseMessage.class),
 
     /** */
-    GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new, new 
GenericValueMessageSerializer()),
+    GENERIC_VALUE_MESSAGE(307, GenericValueMessage.class),
 
     /** */
-    FRAGMENT_MAPPING(350, FragmentMapping::new, new 
FragmentMappingSerializer()),
+    FRAGMENT_MAPPING(350, FragmentMapping.class),
 
     /** */
-    COLOCATION_GROUP(351, ColocationGroup::new, new 
ColocationGroupSerializer()),
+    COLOCATION_GROUP(351, ColocationGroup.class),
 
     /** */
-    FRAGMENT_DESCRIPTION(352, FragmentDescription::new, new 
FragmentDescriptionSerializer()),
+    FRAGMENT_DESCRIPTION(352, FragmentDescription.class),
 
     /** */
-    QUERY_TX_ENTRY(353, QueryTxEntry::new, new QueryTxEntrySerializer());
+    QUERY_TX_ENTRY(353, QueryTxEntry.class);
 
     /** */
-    private final int directType;
+    private final short directType;
 
     /** */
-    private final Supplier<CalciteMessage> factory;
-
-    /** */
-    private final MessageSerializer serializer;
+    private final Class<? extends Message> msgCls;
 
     /**
      * @param directType Direct type.
-     * @param factory Message factory.
-     * @param serializer Message serializer.
      */
-    MessageType(int directType, Supplier<CalciteMessage> factory, 
MessageSerializer serializer) {
-        this.directType = directType;
-        this.factory = factory;
-        this.serializer = serializer;
+    MessageType(int directType, Class<? extends Message> msgCls) {
+        this.directType = (short)directType;
+        this.msgCls = msgCls;
     }
 
-    /**
-     * @return Message factory.
-     */
-    public Supplier<CalciteMessage> factory() {
-        return factory;
+    /** */
+    static boolean isCalciteMessage(Message msg) {
+        MessageType[] values = values();
+        short msgType = msg.directType();
+
+        return msgType >= values[0].directType() && msgType <= 
values[values.length - 1].directType();
     }
 
     /**
      * @return Message direct type.
      */
     public short directType() {
-        return (short)directType;
+        return directType;
     }
 
     /**
-     * @return Message serializer.
+     * @return Message direct type.
      */
-    public MessageSerializer serializer() {
-        return serializer;
+    public Class<? extends Message> messageClass() {
+        return msgCls;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchAcknowledgeMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchAcknowledgeMessage.java
index ec7b04e05ad..acf2b31bedc 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchAcknowledgeMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchAcknowledgeMessage.java
@@ -76,9 +76,4 @@ public class QueryBatchAcknowledgeMessage implements 
ExecutionContextAware {
     public int batchId() {
         return batchId;
     }
-
-    /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.QUERY_ACKNOWLEDGE_MESSAGE;
-    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
index c21297c3a5c..487baff9fdc 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
@@ -17,17 +17,13 @@
 
 package org.apache.ignite.internal.processors.query.calcite.message;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 
-/**
- *
- */
-public class QueryBatchMessage implements CalciteMarshalableMessage, 
ExecutionContextAware {
+/** */
+public class QueryBatchMessage implements ExecutionContextAware {
     /** */
     @Order(0)
     UUID qryId;
@@ -48,15 +44,13 @@ public class QueryBatchMessage implements 
CalciteMarshalableMessage, ExecutionCo
     @Order(4)
     boolean last;
 
-    /** */
-    private List<Object> rows;
-
     /** */
     @Order(5)
-    List<ValueMessage> mRows;
+    List<GenericValueMessage> mRows;
 
     /** */
     public QueryBatchMessage() {
+        // No-op.
     }
 
     /** */
@@ -66,7 +60,8 @@ public class QueryBatchMessage implements 
CalciteMarshalableMessage, ExecutionCo
         this.exchangeId = exchangeId;
         this.batchId = batchId;
         this.last = last;
-        this.rows = rows;
+
+        mRows = rows.stream().map(o -> o == null ? null : new 
GenericValueMessage(o)).collect(Collectors.toList());
     }
 
     /** {@inheritDoc} */
@@ -104,45 +99,6 @@ public class QueryBatchMessage implements 
CalciteMarshalableMessage, ExecutionCo
      * @return Rows.
      */
     public List<Object> rows() {
-        return rows;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        if (mRows != null || rows == null)
-            return;
-
-        mRows = new ArrayList<>(rows.size());
-
-        for (Object row : rows) {
-            ValueMessage mRow = CalciteMessageFactory.asMessage(row);
-
-            assert mRow != null;
-
-            mRow.prepareMarshal(ctx);
-
-            mRows.add(mRow);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        if (rows != null || mRows == null)
-            return;
-
-        rows = new ArrayList<>(mRows.size());
-
-        for (ValueMessage mRow : mRows) {
-            assert mRow != null;
-
-            mRow.prepareUnmarshal(ctx);
-
-            rows.add(mRow.value());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.QUERY_BATCH_MESSAGE;
+        return 
mRows.stream().map(GenericValueMessage::value).collect(Collectors.toList());
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
index 6db451d285c..4972314aa0a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
@@ -19,11 +19,12 @@ package 
org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.util.UUID;
 import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  *
  */
-public class QueryCloseMessage implements CalciteMessage {
+public class QueryCloseMessage implements Message {
     /** */
     @Order(0)
     UUID qryId;
@@ -44,9 +45,4 @@ public class QueryCloseMessage implements CalciteMessage {
     public UUID queryId() {
         return qryId;
     }
-
-    /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.QUERY_CLOSE_MESSAGE;
-    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryInboxCloseMessage.java
similarity index 83%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryInboxCloseMessage.java
index 19638738da0..532c024d657 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryInboxCloseMessage.java
@@ -19,11 +19,12 @@ package 
org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.util.UUID;
 import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  *
  */
-public class InboxCloseMessage implements CalciteMessage {
+public class QueryInboxCloseMessage implements Message {
     /** */
     @Order(0)
     UUID qryId;
@@ -37,12 +38,12 @@ public class InboxCloseMessage implements CalciteMessage {
     long exchangeId;
 
     /** */
-    public InboxCloseMessage() {
+    public QueryInboxCloseMessage() {
         // No-op.
     }
 
     /** */
-    public InboxCloseMessage(UUID qryId, long fragmentId, long exchangeId) {
+    public QueryInboxCloseMessage(UUID qryId, long fragmentId, long 
exchangeId) {
         this.qryId = qryId;
         this.fragmentId = fragmentId;
         this.exchangeId = exchangeId;
@@ -68,9 +69,4 @@ public class InboxCloseMessage implements CalciteMessage {
     public long exchangeId() {
         return exchangeId;
     }
-
-    /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.QUERY_INBOX_CANCEL_MESSAGE;
-    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
index bc0817f2473..15d34e00fbb 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
@@ -21,17 +21,19 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
 import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
-public class QueryStartRequest implements CalciteMarshalableMessage, 
ExecutionContextAware {
+public class QueryStartRequest implements MarshallableMessage, 
CalciteContextMarshallableMessage, ExecutionContextAware {
     /** */
     @Order(0)
     String schema;
@@ -116,7 +118,9 @@ public class QueryStartRequest implements 
CalciteMarshalableMessage, ExecutionCo
     }
 
     /** */
-    QueryStartRequest() {}
+    public QueryStartRequest() {
+        // No-op.
+    }
 
     /**
      * @return Schema name.
@@ -211,12 +215,13 @@ public class QueryStartRequest implements 
CalciteMarshalableMessage, ExecutionCo
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
+    @Override public void prepareMarshal(Marshaller marsh) throws 
IgniteCheckedException {
         if (paramsBytes == null && params != null)
-            paramsBytes = U.marshal(ctx, params);
-
-        fragmentDesc.prepareMarshal(ctx);
+            paramsBytes = U.marshal(marsh, params);
+    }
 
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
         if (qryTxEntries != null) {
             for (QueryTxEntry e : qryTxEntries)
                 e.prepareMarshal(ctx);
@@ -224,22 +229,18 @@ public class QueryStartRequest implements 
CalciteMarshalableMessage, ExecutionCo
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        ClassLoader ldr = U.resolveClassLoader(ctx.gridConfig());
-
+    @Override public void finishUnmarshal(Marshaller marsh, ClassLoader 
clsLdr) throws IgniteCheckedException {
         if (params == null && paramsBytes != null)
-            params = U.unmarshal(ctx, paramsBytes, ldr);
+            params = U.unmarshal(marsh, paramsBytes, clsLdr);
 
-        fragmentDesc.prepareUnmarshal(ctx);
+        paramsBytes = null;
+    }
 
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx, 
ClassLoader clsLdr) throws IgniteCheckedException {
         if (qryTxEntries != null) {
             for (QueryTxEntry e : qryTxEntries)
-                e.prepareUnmarshal(ctx, ldr);
+                e.finishUnmarshal(ctx, clsLdr);
         }
     }
-
-    /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.QUERY_START_REQUEST;
-    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
index f908cb61383..afc9d7c7812 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
@@ -20,12 +20,13 @@ package 
org.apache.ignite.internal.processors.query.calcite.message;
 import java.util.UUID;
 import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.managers.communication.ErrorMessage;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
-public class QueryStartResponse implements CalciteMessage {
+public class QueryStartResponse implements Message {
     /** */
     @Order(0)
     UUID qryId;
@@ -70,9 +71,4 @@ public class QueryStartResponse implements CalciteMessage {
     public @Nullable Throwable error() {
         return ErrorMessage.error(errMsg);
     }
-
-    /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.QUERY_START_RESPONSE;
-    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
index 7961f589d83..36f2d9dceee 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
@@ -38,7 +38,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
  * @see ExecutionContext#transactionChanges(int, int[], Function, Comparator)
  * @see QueryStartRequest#queryTransactionEntries()
  */
-public class QueryTxEntry implements CalciteMessage {
+public class QueryTxEntry implements CalciteContextMarshallableMessage {
     /** Cache id. */
     @Order(0)
     int cacheId;
@@ -95,8 +95,8 @@ public class QueryTxEntry implements CalciteMessage {
         return ver;
     }
 
-    /** */
-    public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws 
IgniteCheckedException {
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
         CacheObjectContext coctx = 
ctx.cacheContext(cacheId).cacheObjectContext();
 
         key.prepareMarshal(coctx);
@@ -105,8 +105,8 @@ public class QueryTxEntry implements CalciteMessage {
             val.prepareMarshal(coctx);
     }
 
-    /** */
-    public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx, ClassLoader 
ldr) throws IgniteCheckedException {
+    /** {@inheritDoc}  */
+    @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
         CacheObjectContext coctx = 
ctx.cacheContext(cacheId).cacheObjectContext();
 
         key.finishUnmarshal(coctx, ldr);
@@ -114,9 +114,4 @@ public class QueryTxEntry implements CalciteMessage {
         if (val != null)
             val.finishUnmarshal(coctx, ldr);
     }
-
-    /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.QUERY_TX_ENTRY;
-    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java
deleted file mode 100644
index 901bcbfafa4..00000000000
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.message;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-
-/** */
-public interface ValueMessage extends CalciteMarshalableMessage {
-    /**
-     * @return Wrapped value.
-     */
-    Object value();
-
-    /** {@inheritDoc} */
-    @Override default void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        // No-op
-    }
-
-    /** {@inheritDoc} */
-    @Override default void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        // No-op
-    }
-}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index 3e4f8001589..7981d736953 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -30,19 +30,18 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
 import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteMarshalableMessage;
-import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.GridIntIterator;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
 
 /** */
-public class ColocationGroup implements CalciteMarshalableMessage {
+public class ColocationGroup implements MarshallableMessage {
     /** */
     @Order(0)
     long[] srcIds;
@@ -102,6 +101,7 @@ public class ColocationGroup implements 
CalciteMarshalableMessage {
 
     /** */
     public ColocationGroup() {
+        // No-op.
     }
 
     /** */
@@ -314,12 +314,7 @@ public class ColocationGroup implements 
CalciteMarshalableMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.COLOCATION_GROUP;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
+    @Override public void prepareMarshal(Marshaller marsh) throws 
IgniteCheckedException {
         if (assignments == null || primaryAssignment)
             return;
 
@@ -348,7 +343,7 @@ public class ColocationGroup implements 
CalciteMarshalableMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
+    @Override public void finishUnmarshal(Marshaller marsh, ClassLoader 
clsLdr) throws IgniteCheckedException {
         if (F.isEmpty(marshalledAssignments))
             return;
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
index 009a9c6c9e3..7560be273c1 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
@@ -21,13 +21,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
 import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteMarshalableMessage;
-import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
+import org.apache.ignite.marshaller.Marshaller;
 
 /** */
-public class FragmentDescription implements CalciteMarshalableMessage {
+public class FragmentDescription implements MarshallableMessage {
     /** */
     @Order(0)
     long fragmentId;
@@ -46,6 +45,7 @@ public class FragmentDescription implements 
CalciteMarshalableMessage {
 
     /** */
     public FragmentDescription() {
+        // No-op.
     }
 
     /** */
@@ -97,27 +97,14 @@ public class FragmentDescription implements 
CalciteMarshalableMessage {
         this.mapping = mapping;
     }
 
-    /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.FRAGMENT_DESCRIPTION;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        if (target != null) {
+    /** */
+    @Override public void prepareMarshal(Marshaller marsh) throws 
IgniteCheckedException {
+        if (target != null)
             target = target.explicitMapping();
-
-            target.prepareMarshal(ctx);
-        }
-
-        if (mapping != null)
-            mapping.prepareMarshal(ctx);
     }
 
-    /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        target.prepareUnmarshal(ctx);
-
-        mapping.prepareUnmarshal(ctx);
+    /** */
+    @Override public void finishUnmarshal(Marshaller marsh, ClassLoader 
clsLdr) throws IgniteCheckedException {
+        // No-op.
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
index 910b990476b..db8368e351d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
@@ -23,26 +23,24 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteMarshalableMessage;
-import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.NotNull;
 
 /**
  *
  */
-public class FragmentMapping implements CalciteMarshalableMessage {
+public class FragmentMapping implements Message {
     /** */
     @Order(0)
     List<ColocationGroup> colocationGrps;
 
     /** */
     public FragmentMapping() {
+        // No-op.
     }
 
     /** */
@@ -174,21 +172,4 @@ public class FragmentMapping implements 
CalciteMarshalableMessage {
         return new FragmentMapping(Commons.transform(colocationGrps,
             g -> explicitMappingGrps.contains(g) ? g.explicitMapping() : g));
     }
-
-    /** {@inheritDoc} */
-    @Override public MessageType type() {
-        return MessageType.FRAGMENT_MAPPING;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        for (ColocationGroup grp : colocationGrps)
-            grp.prepareMarshal(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        for (ColocationGroup grp : colocationGrps)
-            grp.prepareUnmarshal(ctx);
-    }
 }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index 6cf79d4f5df..e67cd18885e 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -55,7 +55,6 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlocki
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
-import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
@@ -389,7 +388,7 @@ public class AbstractExecutionTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void send(UUID nodeId, CalciteMessage msg) {
+        @Override public void send(UUID nodeId, Message msg) {
             mgr.send(localNodeId(), nodeId, msg);
         }
 
@@ -397,16 +396,6 @@ public class AbstractExecutionTest extends 
GridCommonAbstractTest {
         @Override public boolean alive(UUID nodeId) {
             return true;
         }
-
-        /** {@inheritDoc} */
-        @Override protected void prepareMarshal(Message msg) {
-            // No-op;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void prepareUnmarshal(Message msg) {
-            // No-op;
-        }
     }
 
     /**
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
index efd4bffc4ec..af835b4f0ad 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
@@ -27,11 +27,14 @@ import java.util.UUID;
 import java.util.stream.Stream;
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
+import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessageFactory;
 import org.apache.ignite.internal.processors.query.calcite.trait.AllNodes;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runners.Parameterized;
@@ -94,6 +97,9 @@ public class ContinuousExecutionTest extends 
AbstractExecutionTest {
     @Override public void setup() throws Exception {
         nodesCnt = remoteFragmentsCnt + 1;
         super.setup();
+
+        // Register messages in Message#REGISTRATIONS and avoids failure in 
Message#directType().
+        new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new 
CalciteMessageFactory()});
     }
 
     /** */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/TestIoManager.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/TestIoManager.java
index 538b1f6426a..eaef5d2afe6 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/TestIoManager.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/TestIoManager.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.message;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  *
@@ -29,7 +30,7 @@ public class TestIoManager {
     private final Map<UUID, MessageServiceImpl> srvcMap = new 
ConcurrentHashMap<>();
 
     /** */
-    public void send(UUID senderId, UUID nodeId, CalciteMessage msg) {
+    public void send(UUID senderId, UUID nodeId, Message msg) {
         srvcMap.get(nodeId).onMessage(senderId, msg);
     }
 
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index b0310c799ce..e91e894e7c2 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -53,7 +53,6 @@ import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
-import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
 import 
org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
@@ -740,7 +739,7 @@ public abstract class AbstractPlannerTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void send(UUID nodeId, CalciteMessage msg) {
+        @Override public void send(UUID nodeId, Message msg) {
             mgr.send(localNodeId(), nodeId, msg);
         }
 
@@ -748,16 +747,6 @@ public abstract class AbstractPlannerTest extends 
GridCommonAbstractTest {
         @Override public boolean alive(UUID nodeId) {
             return true;
         }
-
-        /** {@inheritDoc} */
-        @Override protected void prepareMarshal(Message msg) {
-            // No-op;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void prepareUnmarshal(Message msg) {
-            // No-op;
-        }
     }
 
     /** */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java
index 11483e74181..75530ae0304 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.IgniteCheckedException;
+import 
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.query.calcite.QueryRegistryImpl;
@@ -40,6 +41,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
+import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessageFactory;
 import 
org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
@@ -63,6 +65,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import 
org.apache.ignite.internal.processors.security.NoOpIgniteSecurityProcessor;
 import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor;
 import org.apache.ignite.internal.util.typedef.F;
+import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.junit.Assert;
 import org.junit.Test;
@@ -75,6 +78,14 @@ import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KE
  */
 @SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
 public class PlanExecutionTest extends AbstractPlannerTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        // Register messages in Message#REGISTRATIONS and avoids failure in 
Message#directType().
+        new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new 
CalciteMessageFactory()});
+    }
+
     /**
      * @throws Exception If failed.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index cb076954ad8..aa5d1e210cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -17,8 +17,7 @@
 
 package org.apache.ignite.internal;
 
-import java.lang.reflect.Constructor;
-import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
@@ -40,6 +39,7 @@ import 
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyReque
 import 
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
 import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest;
 import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
+import 
org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
 import org.apache.ignite.internal.processors.authentication.User;
 import 
org.apache.ignite.internal.processors.authentication.UserAcceptedMessage;
 import 
org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage;
@@ -230,14 +230,10 @@ import 
org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.distributed.FullMessage;
 import org.apache.ignite.internal.util.distributed.InitMessage;
 import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
 import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
 import 
org.apache.ignite.spi.communication.tcp.internal.TcpConnectionRequestDiscoveryMessage;
 import 
org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage;
@@ -278,7 +274,7 @@ import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessa
 import org.jetbrains.annotations.Nullable;
 
 /** */
-public class CoreMessagesProvider implements MessageFactoryProvider {
+public class CoreMessagesProvider extends 
AbstractMarshallableMessageFactoryProvider {
     /** Node ID message type. */
     public static final short NODE_ID_MSG_TYPE = 11500;
 
@@ -291,15 +287,6 @@ public class CoreMessagesProvider implements 
MessageFactoryProvider {
     /** */
     public static final short MAX_MESSAGE_ID = 15_000;
 
-    /** Binary marshaller. */
-    private final Marshaller schemaAwareMarhaller;
-
-    /** Binary marshaller. */
-    private final Marshaller schemaLessMarshaller;
-
-    /** Resolved classloader. */
-    private final ClassLoader resolvedClsLdr;
-
     /** */
     private short msgIdx;
 
@@ -307,14 +294,22 @@ public class CoreMessagesProvider implements 
MessageFactoryProvider {
     private @Nullable MessageFactory factory;
 
     /**
-     * @param schemaAwareMarhaller Schema-aware marshaller like {@link 
BinaryMarshaller}.
-     * @param schemaLessMarshaller Pure, schemaless marshaller like {@link 
JdkMarshaller}.
-     * @param resolvedClsLdr Resolved classloader.
+     * Default plugin-purposes constructor.
+     *
+     * @see #init(Marshaller, ClassLoader)
      */
-    public CoreMessagesProvider(Marshaller schemaAwareMarhaller, Marshaller 
schemaLessMarshaller, ClassLoader resolvedClsLdr) {
-        this.schemaAwareMarhaller = schemaAwareMarhaller;
-        this.schemaLessMarshaller = schemaLessMarshaller;
-        this.resolvedClsLdr = resolvedClsLdr;
+    public CoreMessagesProvider() {
+        // No-op.
+    }
+
+    /**
+     * Constructor allowing not to call {@link #init(Marshaller, ClassLoader)}.
+     *
+     * @param schemaAwareMarsh Schema-aware marshaller like {@link 
BinaryMarshaller}.
+     * @param resolvedClsLdr Resolved (configured) class loader like {@link 
IgniteConfiguration#setClassLoader(ClassLoader)}.
+     */
+    public CoreMessagesProvider(Marshaller schemaAwareMarsh, ClassLoader 
resolvedClsLdr) {
+        init(schemaAwareMarsh, resolvedClsLdr);
     }
 
     /** The order is important. If wish to remove a message, put 'msgIdx++' on 
its place. */
@@ -644,52 +639,23 @@ public class CoreMessagesProvider implements 
MessageFactoryProvider {
         assert msgIdx <= MAX_MESSAGE_ID;
     }
 
-    /** Registers message using {@link #schemaAwareMarhaller} and {@link 
U#gridClassLoader()}. */
-    private <T extends Message> void withSchema(Class<T> cls) {
-        register(cls, schemaAwareMarhaller, U.gridClassLoader());
+    /** Registers message using {@link #dfltMarsh} and {@link #dftlClsLdr}. */
+    private <T extends Message> void withNoSchema(Class<T> cls) {
+        register(cls, dfltMarsh, dftlClsLdr);
     }
 
-    /** Registers message using {@link #schemaLessMarshaller} and {@link 
U#gridClassLoader()}. */
-    private <T extends Message> void withNoSchema(Class<T> cls) {
-        register(cls, schemaLessMarshaller, U.gridClassLoader());
+    /** Registers message using {@link #schemaAwareMarsh} and {@link 
#dftlClsLdr}. */
+    private <T extends Message> void withSchema(Class<T> cls) {
+        register(cls, schemaAwareMarsh, dftlClsLdr);
     }
 
-    /** Registers message using {@link #schemaLessMarshaller} and {@link 
#resolvedClsLdr}. */
+    /** Registers message using {@link #schemaAwareMarsh} and {@link 
#resolvedClsLdr}. */
     private <T extends Message> void withNoSchemaResolvedClassLoader(Class<T> 
cls) {
-        register(cls, schemaLessMarshaller, resolvedClsLdr);
+        register(cls, dfltMarsh, resolvedClsLdr);
     }
 
     /** Registers message using incrementing {@link #msgIdx} as the message 
id/type. */
     private <T extends Message> void register(Class<T> cls, Marshaller marsh, 
ClassLoader clsLrd) {
-        Constructor<T> ctor;
-        MessageSerializer<T> serializer;
-
-        try {
-            ctor = cls.getConstructor();
-
-            boolean marshallable = 
MarshallableMessage.class.isAssignableFrom(cls);
-
-            Class<?> serCls = Class.forName(cls.getName() + (marshallable ? 
"MarshallableSerializer" : "Serializer"));
-
-            serializer = marshallable
-                ? 
(MessageSerializer<T>)serCls.getConstructor(Marshaller.class, 
ClassLoader.class).newInstance(marsh, clsLrd)
-                : (MessageSerializer<T>)serCls.getConstructor().newInstance();
-        }
-        catch (Exception e) {
-            throw new IgniteException("Failed to register message of type " + 
cls.getSimpleName(), e);
-        }
-
-        factory.register(
-            msgIdx++,
-            () -> {
-                try {
-                    return ctor.newInstance();
-                }
-                catch (Exception e) {
-                    throw new IgniteException("Failed to create message of 
type " + cls.getSimpleName(), e);
-                }
-            },
-            serializer
-        );
+        register(factory, cls, msgIdx++, marsh, clsLrd);
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 315130fe561..a27c0623eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -112,6 +112,7 @@ import 
org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
 import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
 import 
org.apache.ignite.internal.managers.systemview.IgniteConfigurationIterable;
 import org.apache.ignite.internal.managers.tracing.GridTracingManager;
+import 
org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
 import org.apache.ignite.internal.plugin.IgniteLogInfoProvider;
 import org.apache.ignite.internal.plugin.IgniteLogInfoProviderImpl;
 import org.apache.ignite.internal.processors.GridProcessor;
@@ -1098,7 +1099,7 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
                 startProcessor(new GridTaskProcessor(ctx));
                 startProcessor((GridProcessor)SCHEDULE.createOptional(ctx));
                 startProcessor(createComponent(IgniteRestProcessor.class, 
ctx));
-                startProcessor(new DataStreamProcessor(ctx));
+                startProcessor(new DataStreamProcessor<>(ctx));
                 startProcessor(new GridContinuousProcessor(ctx));
                 startProcessor(new DataStructuresProcessor(ctx));
                 startProcessor(createComponent(PlatformProcessor.class, ctx));
@@ -1114,7 +1115,7 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
                 startTimer.finishGlobalStage("Start processors");
 
                 // Start plugins.
-                for (PluginProvider provider : ctx.plugins().allProviders()) {
+                for (PluginProvider<?> provider : 
ctx.plugins().allProviders()) {
                     ctx.add(new GridPluginComponent(provider));
 
                     
provider.start(ctx.plugins().pluginContextForProvider(provider));
@@ -1255,7 +1256,7 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
             }
 
             // Start plugins.
-            for (PluginProvider provider : ctx.plugins().allProviders())
+            for (PluginProvider<?> provider : ctx.plugins().allProviders())
                 provider.onIgniteStart();
 
             if (recon)
@@ -1319,14 +1320,17 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
 
         List<MessageFactoryProvider> compMsgs = new ArrayList<>();
 
-        compMsgs.add(new CoreMessagesProvider(ctx.marshaller(), 
ctx.marshallerContext().jdkMarshaller(),
-            U.resolveClassLoader(ctx.config())));
+        compMsgs.add(new CoreMessagesProvider(ctx.marshaller(), 
U.resolveClassLoader(ctx.config())));
 
         for (IgniteComponentType compType : IgniteComponentType.values()) {
             MessageFactoryProvider f = compType.messageFactory();
 
-            if (f != null)
+            if (f != null) {
+                if (f instanceof AbstractMarshallableMessageFactoryProvider)
+                    
((AbstractMarshallableMessageFactoryProvider)f).init(ctx.marshaller(), 
U.resolveClassLoader(ctx.config()));
+
                 compMsgs.add(f);
+            }
         }
 
         DiscoverySpi discoSpi = ctx.config().getDiscoverySpi();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index a9d0b451458..ef79d006969 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -346,9 +346,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
         new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_TOPICS, 
MAX_CLOSED_TOPICS, 0.75f, 256,
             PER_SEGMENT_Q_OPTIMIZED_RMV);
 
-    /** */
-    private MessageFactory msgFactory;
-
     /** */
     private MessageFormatter formatter;
 
@@ -393,9 +390,9 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
      * @return Message factory.
      */
     public MessageFactory messageFactory() {
-        assert msgFactory != null;
+        assert ctx.messageFactory() != null;
 
-        return msgFactory;
+        return ctx.messageFactory();
     }
 
     /**
@@ -437,8 +434,6 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
             };
         }
 
-        msgFactory = ctx.messageFactory();
-
         CommunicationSpi<Object> spi = getSpi();
 
         if ((CommunicationSpi<?>)spi instanceof TcpCommunicationSpi)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/plugin/AbstractMarshallableMessageFactoryProvider.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/plugin/AbstractMarshallableMessageFactoryProvider.java
new file mode 100644
index 00000000000..03a65b95b6e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/plugin/AbstractMarshallableMessageFactoryProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.plugin;
+
+import java.lang.reflect.Constructor;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.MarshallableMessage;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.Marshallers;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+
+/**
+ * An extension of {@link MessageFactoryProvider} allowing to use provided 
schema-aware marshaller
+ * and resolved class loader to register {@link MarshallableMessage}.
+ */
+public abstract class AbstractMarshallableMessageFactoryProvider implements 
MessageFactoryProvider {
+    /** Default schema-less marshaller. */
+    protected final Marshaller dfltMarsh = Marshallers.jdk();
+
+    /** Default class loader. */
+    protected final ClassLoader dftlClsLdr = U.gridClassLoader();
+
+    /** Schema-aware marshaller like {@link BinaryMarshaller}. */
+    protected Marshaller schemaAwareMarsh;
+
+    /** Resolved (configured) class loader like {@link 
IgniteConfiguration#setClassLoader(ClassLoader)}. */
+    protected ClassLoader resolvedClsLdr;
+
+    /**
+     * @param schemaAwareMarsh Schema-aware marshaller like {@link 
BinaryMarshaller}.
+     * @param resolvedClsLdr Resolved (configured) class loader like {@link 
IgniteConfiguration#setClassLoader(ClassLoader)}.
+     */
+    public void init(Marshaller schemaAwareMarsh, ClassLoader resolvedClsLdr) {
+        this.schemaAwareMarsh = schemaAwareMarsh;
+        this.resolvedClsLdr = resolvedClsLdr;
+    }
+
+    /** Registers message automatically generating message supplier and 
serializer. */
+    protected static <T extends Message> void register(MessageFactory factory, 
Class<T> cls, short id, Marshaller marsh, 
+        ClassLoader clsLrd) {
+        Constructor<T> ctor;
+        MessageSerializer<T> serializer;
+
+        try {
+            ctor = cls.getConstructor();
+
+            boolean marshallable = 
MarshallableMessage.class.isAssignableFrom(cls);
+
+            Class<?> serCls = Class.forName(cls.getName() + (marshallable ? 
"MarshallableSerializer" : "Serializer"));
+
+            serializer = marshallable
+                ? 
(MessageSerializer<T>)serCls.getConstructor(Marshaller.class, 
ClassLoader.class).newInstance(marsh, clsLrd)
+                : (MessageSerializer<T>)serCls.getConstructor().newInstance();
+        }
+        catch (Exception e) {
+            throw new IgniteException("Failed to register message of type " + 
cls.getSimpleName(), e);
+        }
+
+        factory.register(
+            id,
+            () -> {
+                try {
+                    return ctor.newInstance();
+                }
+                catch (Exception e) {
+                    throw new IgniteException("Failed to create message of 
type " + cls.getSimpleName(), e);
+                }
+            },
+            serializer
+        );
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
index ea37b6abe45..a4c91643889 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
@@ -46,7 +46,7 @@ public class DirectMarshallingMessagesTest extends 
GridCommonAbstractTest {
     /** Message factory. */
     private final MessageFactory msgFactory =
         new IgniteMessageFactoryImpl(new MessageFactoryProvider[] {
-            new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()),
+            new CoreMessagesProvider(jdk(), U.gridClassLoader()),
             factory -> factory.register(
                 TestNestedContainersMessage.TYPE,
                 TestNestedContainersMessage::new,
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteCoreMessagesSerializationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteCoreMessagesSerializationTest.java
index 21f22330814..74f1c90b998 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteCoreMessagesSerializationTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteCoreMessagesSerializationTest.java
@@ -34,7 +34,7 @@ import static org.apache.ignite.marshaller.Marshallers.jdk;
 public class IgniteCoreMessagesSerializationTest extends 
AbstractMessageSerializationTest {
     /** {@inheritDoc} */
     @Override protected MessageFactoryProvider messageFactory() {
-        return new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader());
+        return new CoreMessagesProvider(jdk(), U.gridClassLoader());
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
index 53fa342460b..f124eb98228 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
@@ -46,7 +46,7 @@ public class CompressedMessageTest {
     @Test
     public void testWriteReadHugeMessage() {
         MessageFactory msgFactory = new IgniteMessageFactoryImpl(new 
MessageFactoryProvider[]{
-            new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader())});
+            new CoreMessagesProvider(jdk(), U.gridClassLoader())});
 
         DirectMessageWriter writer = new DirectMessageWriter(msgFactory);
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index a41490c0e40..15ddd54040c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -150,7 +150,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest 
extends GridCommonAbst
         e0.markFiltered();
 
         IgniteMessageFactoryImpl msgFactory =
-            new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new 
CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader())});
+            new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new 
CoreMessagesProvider(jdk(), U.gridClassLoader())});
 
         ByteBuffer buf = ByteBuffer.allocate(4096);
         DirectMessageWriter writer = new DirectMessageWriter(msgFactory);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 1ca7217b67d..f82c844bda9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -154,7 +154,7 @@ public abstract class GridAbstractCommunicationSelfTest<T 
extends CommunicationS
             GridSpiTestContext ctx = initSpiContext();
 
             ctx.messageFactory(new IgniteMessageFactoryImpl(new 
MessageFactoryProvider[] {
-                new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), 
customMessageFactory()}));
+                new CoreMessagesProvider(jdk(), U.gridClassLoader()), 
customMessageFactory()}));
 
             ctx.setLocalNode(node);
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 997d5b59fd4..1bc2358cc7e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -434,7 +434,7 @@ public class 
GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
             GridSpiTestContext ctx = initSpiContext();
 
             ctx.messageFactory(new IgniteMessageFactoryImpl(
-                new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), 
jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
+                new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), 
U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
             );
 
             ctx.setLocalNode(node);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
index 3f2d90924bf..a604da0edb7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
@@ -252,7 +252,7 @@ public class GridTcpCommunicationSpiConfigSelfTest extends 
GridSpiAbstractConfig
         node.setId(rsrcs.getNodeId());
 
         ctx.messageFactory(new IgniteMessageFactoryImpl(new 
MessageFactoryProvider[]{
-            new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), 
GRID_TEST_MESSAGE_FACTORY}));
+            new CoreMessagesProvider(jdk(), U.gridClassLoader()), 
GRID_TEST_MESSAGE_FACTORY}));
 
         ctx.setLocalNode(node);
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index c043c068fc4..6a5d26eb266 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -468,7 +468,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest 
extends GridSpiAbstrac
             GridSpiTestContext ctx = initSpiContext();
 
             ctx.messageFactory(new IgniteMessageFactoryImpl(
-                    new MessageFactoryProvider[] {new 
CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), 
GRID_TEST_MESSAGE_FACTORY})
+                    new MessageFactoryProvider[] {new 
CoreMessagesProvider(jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
             );
 
             ctx.timeoutProcessor(timeoutProcessor);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 56c6daceca5..de46f4b674a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -400,7 +400,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T 
extends CommunicationS
             GridSpiTestContext ctx = initSpiContext();
 
             ctx.messageFactory(new IgniteMessageFactoryImpl(
-                    new MessageFactoryProvider[] {new 
CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), 
GRID_TEST_MESSAGE_FACTORY})
+                    new MessageFactoryProvider[] {new 
CoreMessagesProvider(jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
             );
 
             ctx.setLocalNode(node);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index d4f00e425a4..633bd332eaf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -729,7 +729,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T 
extends CommunicationSpi<
             GridSpiTestContext ctx = initSpiContext();
 
             ctx.messageFactory(new IgniteMessageFactoryImpl(
-                new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), 
jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
+                new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), 
U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
             );
 
             ctx.setLocalNode(node);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index 58a321e21c8..06d657a8a3b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -453,7 +453,7 @@ public class 
IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
             GridSpiTestContext ctx = initSpiContext();
 
             ctx.messageFactory(new IgniteMessageFactoryImpl(
-                new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), 
jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
+                new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), 
U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
             );
 
             ctx.setLocalNode(node);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
index 2aaabee1aeb..decf2bf4b5c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -121,7 +121,7 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi 
implements IgniteDiscov
         assert !started();
 
         this.msgFactory = new IgniteMessageFactoryImpl(new 
MessageFactoryProvider[] {
-            new CoreMessagesProvider(jdk(), jdk(), 
U.resolveClassLoader(ignite().configuration())),
+            new CoreMessagesProvider(jdk(), 
U.resolveClassLoader(ignite().configuration())),
             msgFactoryProvider
         });
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 4eb5d79e0b6..b5c5035b7d0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -554,7 +554,7 @@ public class GridSpiTestContext implements IgniteSpiContext 
{
     /** {@inheritDoc} */
     @Override public MessageFactory messageFactory() {
         if (factory == null)
-            factory = new IgniteMessageFactoryImpl(new 
MessageFactoryProvider[]{new CoreMessagesProvider(jdk(), jdk(),
+            factory = new IgniteMessageFactoryImpl(new 
MessageFactoryProvider[]{new CoreMessagesProvider(jdk(), 
                 U.gridClassLoader())});
 
         return factory;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index fe6becffafc..e6638d4e112 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -85,7 +85,6 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.Marshallers;
 import org.apache.ignite.metric.IgniteMetrics;
 import org.apache.ignite.plugin.IgnitePlugin;
 import org.apache.ignite.plugin.PluginNotFoundException;
@@ -156,7 +155,7 @@ public class IgniteMock implements IgniteEx {
         ClassLoader lrd = staticCfg == null ? U.gridClassLoader() : 
U.resolveClassLoader(staticCfg);
 
         msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[] 
{
-            new CoreMessagesProvider(marshaller, Marshallers.jdk(), lrd)});
+            new CoreMessagesProvider(marshaller, lrd)});
 
         try {
             kernalCtx = new StandaloneGridKernalContext(new 
GridTestLog4jLogger(), null) {


Reply via email to