This is an automated email from the ASF dual-hosted git repository.

shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 87aa722260d IGNITE-27627 Remove support of java serialization for tcp 
discovery messages (#13023)
87aa722260d is described below

commit 87aa722260d2ee798e37b79aed1ac3dd205fb9d5
Author: Ilya Shishkov <[email protected]>
AuthorDate: Fri Apr 24 10:19:17 2026 +0300

    IGNITE-27627 Remove support of java serialization for tcp discovery 
messages (#13023)
---
 .../org/apache/ignite/internal/IgniteKernal.java   |  23 +++-
 .../SecurityAwareCustomMessageWrapper.java         |  35 +----
 .../ignite/spi/discovery/tcp/ClientImpl.java       |  52 +++-----
 .../ignite/spi/discovery/tcp/ServerImpl.java       | 100 ++++----------
 .../spi/discovery/tcp/TcpDiscoveryIoSession.java   |  53 +++-----
 .../tcp/TcpDiscoveryMessageSerializer.java         |   4 -
 .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java  |   8 --
 .../messages/TcpDiscoveryCustomEventMessage.java   |  68 +---------
 .../NodeSecurityContextPropagationTest.java        |   3 +-
 .../apache/ignite/spi/MessagesPluginProvider.java  |  20 +--
 .../tcp/DiscoveryUnmarshalVulnerabilityTest.java   | 146 ++++++++++++++++-----
 .../ignite/spi/discovery/tcp/ExploitMessage.java   |  52 ++++++++
 .../spi/discovery/tcp/TestTcpDiscoverySpi.java     |  17 ++-
 .../apache/ignite/testframework/GridTestUtils.java |  30 +++++
 14 files changed, 303 insertions(+), 308 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 437a9b254a0..9d60202d7e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1326,10 +1326,7 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
             MessageFactoryProvider f = compType.messageFactory();
 
             if (f != null) {
-                if (f instanceof AbstractMarshallableMessageFactoryProvider) {
-                    
((AbstractMarshallableMessageFactoryProvider)f).init(ctx.marshallerContext().jdkMarshaller(),
-                        ctx.marshaller(), resolvedClsLdr);
-                }
+                initProvider(f, resolvedClsLdr);
 
                 compMsgs.add(f);
             }
@@ -1340,8 +1337,11 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
         if (discoSpi instanceof IgniteDiscoverySpi) {
             MessageFactoryProvider discoMsgs = 
((IgniteDiscoverySpi)discoSpi).messageFactoryProvider();
 
-            if (discoMsgs != null)
+            if (discoMsgs != null) {
+                initProvider(discoMsgs, resolvedClsLdr);
+
                 compMsgs.add(discoMsgs);
+            }
         }
 
         if (!compMsgs.isEmpty())
@@ -1350,6 +1350,19 @@ public class IgniteKernal implements IgniteEx, 
Externalizable {
         msgFactory = new IgniteMessageFactoryImpl(msgs);
     }
 
+    /**
+     * Re-init {@link AbstractMarshallableMessageFactoryProvider} with a 
proper marshaller and classloader.
+     *
+     * @param factoryProvider Message factory provider.
+     * @param clsLdr Class loader.
+     */
+    private void initProvider(MessageFactoryProvider factoryProvider, 
ClassLoader clsLdr) {
+        if (factoryProvider instanceof 
AbstractMarshallableMessageFactoryProvider) {
+            
((AbstractMarshallableMessageFactoryProvider)factoryProvider).init(ctx.marshallerContext().jdkMarshaller(),
+                ctx.marshaller(), clsLdr);
+        }
+    }
+
     /**
      * @return Ignite security processor. See {@link IgniteSecurity} for 
details.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java
index 7c8269bd14d..e9d33b8433c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java
@@ -18,34 +18,20 @@
 package org.apache.ignite.internal.managers.discovery;
 
 import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.MarshallableMessage;
 import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.jetbrains.annotations.Nullable;
 
 /** Custom message wrapper with ID of security subject that initiated the 
current message. */
-public class SecurityAwareCustomMessageWrapper implements 
DiscoverySpiCustomMessage, MarshallableMessage {
+public class SecurityAwareCustomMessageWrapper implements 
DiscoverySpiCustomMessage {
     /** Security subject ID. */
     @Order(0)
     UUID secSubjId;
 
     /** Original message. */
-    private DiscoveryCustomMessage delegate;
-
-    /** */
-    // TODO: Should be removed in 
https://issues.apache.org/jira/browse/IGNITE-27627
     @Order(1)
-    Message msg;
-
-    /** Serialized message bytes. */
-    // TODO: Should be removed in 
https://issues.apache.org/jira/browse/IGNITE-27627
-    @Order(2)
-    byte[] msgBytes;
+    DiscoveryCustomMessage delegate;
 
     /** Default constructor for {@link MessageFactory}. */
     public SecurityAwareCustomMessageWrapper() {
@@ -56,9 +42,6 @@ public class SecurityAwareCustomMessageWrapper implements 
DiscoverySpiCustomMess
     public SecurityAwareCustomMessageWrapper(DiscoveryCustomMessage delegate, 
UUID secSubjId) {
         this.delegate = delegate;
         this.secSubjId = secSubjId;
-
-        if (delegate instanceof Message)
-            msg = (Message)delegate;
     }
 
     /** Gets security Subject ID. */
@@ -80,7 +63,7 @@ public class SecurityAwareCustomMessageWrapper implements 
DiscoverySpiCustomMess
      * @return Delegate.
      */
     public DiscoveryCustomMessage delegate() {
-        return msg != null ? (DiscoveryCustomMessage)msg : delegate;
+        return delegate;
     }
 
     /** {@inheritDoc} */
@@ -89,16 +72,4 @@ public class SecurityAwareCustomMessageWrapper implements 
DiscoverySpiCustomMess
 
         return ack == null ? null : new SecurityAwareCustomMessageWrapper(ack, 
secSubjId);
     }
-
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(Marshaller marsh) throws 
IgniteCheckedException {
-        if (!(delegate instanceof Message))
-            msgBytes = U.marshal(marsh, delegate);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(Marshaller marsh, ClassLoader 
clsLdr) throws IgniteCheckedException {
-        if (msgBytes != null)
-            delegate = U.unmarshal(marsh, msgBytes, clsLdr);
-    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index f7812b0ebb8..6403805672b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -498,35 +498,28 @@ class ClientImpl extends TcpDiscoveryImpl {
         if (state == STOPPED || state == SEGMENTED || state == STARTING)
             throw new IgniteException("Failed to send custom message: client 
is " + state.name().toLowerCase() + ".");
 
-        try {
-            TcpDiscoveryCustomEventMessage msg;
-
-            DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
+        TcpDiscoveryCustomEventMessage msg;
 
-            if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
-                msg = new 
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
-            else
-                msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), 
evt);
+        DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
 
-            Span rootSpan = 
tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
-                .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> 
getLocalNodeId().toString())
-                .addTag(SpanTags.tag(SpanTags.EVENT_NODE, 
SpanTags.CONSISTENT_ID),
-                    () -> locNode.consistentId().toString())
-                .addTag(SpanTags.MESSAGE_CLASS, () -> 
customMsg.getClass().getSimpleName())
-                .addLog(() -> "Created");
+        if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
+            msg = new 
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
+        else
+            msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);
 
-            // This root span will be parent both from local and remote nodes.
-            
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
+        Span rootSpan = 
tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
+            .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> 
getLocalNodeId().toString())
+            .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
+                () -> locNode.consistentId().toString())
+            .addTag(SpanTags.MESSAGE_CLASS, () -> 
customMsg.getClass().getSimpleName())
+            .addLog(() -> "Created");
 
-            msg.prepareMarshal(spi.marshaller());
+        // This root span will be parent both from local and remote nodes.
+        msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
 
-            sockWriter.sendMessage(msg);
+        sockWriter.sendMessage(msg);
 
-            rootSpan.addLog(() -> "Sent").end();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteSpiException("Failed to marshal custom event: " + 
evt, e);
-        }
+        rootSpan.addLog(() -> "Sent").end();
     }
 
     /** {@inheritDoc} */
@@ -2593,17 +2586,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? 
locNode : rmtNodes.get(nodeId);
 
                     if (node != null && node.visible()) {
-                        try {
-                            msg.finishUnmarshal(spi.marshaller(), 
U.resolveClassLoader(spi.ignite().configuration()));
-
-                            DiscoverySpiCustomMessage msgObj = msg.message();
-
-                            notifyDiscovery(
-                                EVT_DISCOVERY_CUSTOM_EVT, topVer, node, 
allVisibleNodes(), msgObj, msg.spanContainer());
-                        }
-                        catch (Throwable e) {
-                            U.error(log, "Failed to unmarshal discovery custom 
message.", e);
-                        }
+                        notifyDiscovery(
+                            EVT_DISCOVERY_CUSTOM_EVT, topVer, node, 
allVisibleNodes(), msg.message(), msg.spanContainer());
                     }
                     else if (log.isDebugEnabled())
                         log.debug("Received metrics from unknown node: " + 
nodeId);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index cc87595684b..17bda88dcbc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1017,35 +1017,28 @@ class ServerImpl extends TcpDiscoveryImpl {
 
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
-        try {
-            TcpDiscoveryCustomEventMessage msg;
+        TcpDiscoveryCustomEventMessage msg;
 
-            DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
+        DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
 
-            if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
-                msg = new 
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
-            else
-                msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), 
evt);
+        if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
+            msg = new 
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
+        else
+            msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);
 
-            Span rootSpan = 
tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
-                .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> 
getLocalNodeId().toString())
-                .addTag(SpanTags.tag(SpanTags.EVENT_NODE, 
SpanTags.CONSISTENT_ID),
-                    () -> locNode.consistentId().toString())
-                .addTag(SpanTags.MESSAGE_CLASS, () -> 
customMsg.getClass().getSimpleName())
-                .addLog(() -> "Created");
+        Span rootSpan = 
tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
+            .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> 
getLocalNodeId().toString())
+            .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
+                () -> locNode.consistentId().toString())
+            .addTag(SpanTags.MESSAGE_CLASS, () -> 
customMsg.getClass().getSimpleName())
+            .addLog(() -> "Created");
 
-            // This root span will be parent both from local and remote nodes.
-            
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
+        // This root span will be parent both from local and remote nodes.
+        msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
 
-            msg.prepareMarshal(spi.marshaller());
-
-            msgWorker.addMessage(msg);
+        msgWorker.addMessage(msg);
 
-            rootSpan.addLog(() -> "Sent").end();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteSpiException("Failed to marshal custom event: " + 
evt, e);
-        }
+        rootSpan.addLog(() -> "Sent").end();
     }
 
     /** {@inheritDoc} */
@@ -2556,9 +2549,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                 TcpDiscoveryCustomEventMessage msg0 = 
(TcpDiscoveryCustomEventMessage)msg;
 
                 msg = new TcpDiscoveryCustomEventMessage(msg0);
-
-                // We shoulgn't store deserialized message in the queue 
because of msg is transient.
-                ((TcpDiscoveryCustomEventMessage)msg).clearMessage();
             }
 
             synchronized (msgs) {
@@ -6106,40 +6096,22 @@ class ServerImpl extends TcpDiscoveryImpl {
                             processCustomMessage(msg, waitForNotification);
                         }
                     }
-
-                    msg.clearMessage();
                 }
                 else {
                     addMessage(new 
TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
 
-                    DiscoverySpiCustomMessage msgObj = null;
+                    DiscoverySpiCustomMessage customMsg = msg.message();
 
-                    try {
-                        msg.finishUnmarshal(spi.marshaller(), 
U.resolveClassLoader(spi.ignite().configuration()));
-
-                        msgObj = msg.message();
-                    }
-                    catch (Throwable e) {
-                        U.error(log, "Failed to unmarshal discovery custom 
message.", e);
-                    }
-
-                    if (msgObj != null) {
-                        DiscoverySpiCustomMessage nextMsg = 
msgObj.ackMessage();
+                    if (customMsg != null) {
+                        DiscoverySpiCustomMessage nextMsg = 
customMsg.ackMessage();
 
                         if (nextMsg != null) {
-                            try {
-                                TcpDiscoveryCustomEventMessage ackMsg = new 
TcpDiscoveryCustomEventMessage(
-                                    getLocalNodeId(), nextMsg);
-
-                                ackMsg.topologyVersion(msg.topologyVersion());
+                            TcpDiscoveryCustomEventMessage ackMsg = new 
TcpDiscoveryCustomEventMessage(
+                                getLocalNodeId(), nextMsg);
 
-                                ackMsg.prepareMarshal(spi.marshaller());
+                            ackMsg.topologyVersion(msg.topologyVersion());
 
-                                processCustomMessage(ackMsg, 
waitForNotification);
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.error(log, "Failed to marshal discovery 
custom message.", e);
-                            }
+                            processCustomMessage(ackMsg, waitForNotification);
                         }
                     }
                 }
@@ -6165,8 +6137,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                     notifyDiscoveryListener(msg, waitForNotification);
                 }
 
-                msg.clearMessage();
-
                 if (sendMessageToRemotes(msg))
                     sendMessageAcrossRing(msg);
             }
@@ -6303,16 +6273,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (node == null)
                     return;
 
-                DiscoverySpiCustomMessage msgObj;
-
-                try {
-                    msg.finishUnmarshal(spi.marshaller(), 
U.resolveClassLoader(spi.ignite().configuration()));
-
-                    msgObj = msg.message();
-                }
-                catch (Throwable t) {
-                    throw new IgniteException("Failed to unmarshal discovery 
custom message: " + msg, t);
-                }
+                DiscoverySpiCustomMessage customMsg = msg.message();
 
                 IgniteFuture<?> fut = lsnr.onDiscovery(
                     new DiscoveryNotification(
@@ -6321,13 +6282,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                         node,
                         snapshot,
                         hist,
-                        msgObj,
+                        customMsg,
                         msg.spanContainer())
                     );
 
                 notifiedDiscovery.set(true);
 
-                if (waitForNotification || msgObj.isMutable()) {
+                if (waitForNotification || customMsg.isMutable()) {
                     blockingSectionBegin();
 
                     try {
@@ -6337,15 +6298,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                         blockingSectionEnd();
                     }
                 }
-
-                if (msgObj.isMutable()) {
-                    try {
-                        msg.prepareMarshal(spi.marshaller());
-                    }
-                    catch (Throwable t) {
-                        throw new IgniteException("Failed to marshal mutable 
discovery message: " + msgObj, t);
-                    }
-                }
             }
         }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
index 30e9b1b73f8..6be18c60c40 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -62,19 +62,9 @@ public class TcpDiscoveryIoSession {
     /** Size for an intermediate buffer for serializing discovery messages. */
     private static final int MSG_BUFFER_SIZE = 100;
 
-    /** Leading byte for messages use {@link JdkMarshaller} for serialization. 
*/
-    // TODO: remove these flags after refactoring all discovery messages.
-    static final byte JAVA_SERIALIZATION = (byte)1;
-
-    /** Leading byte for messages use {@link MessageSerializer} for 
serialization. */
-    static final byte MESSAGE_SERIALIZATION = (byte)2;
-
     /** */
     final TcpDiscoverySpi spi;
 
-    /** Loads discovery messages classes during java deserialization. */
-    private final ClassLoader clsLdr;
-
     /** */
     private final Socket sock;
 
@@ -104,8 +94,6 @@ public class TcpDiscoveryIoSession {
         this.sock = sock;
         this.spi = spi;
 
-        clsLdr = U.resolveClassLoader(spi.ignite().configuration());
-
         msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
 
         msgWriter = new DirectMessageWriter(spi.messageFactory());
@@ -130,17 +118,7 @@ public class TcpDiscoveryIoSession {
      * @throws IgniteCheckedException If serialization fails.
      */
     void writeMessage(TcpDiscoveryAbstractMessage msg) throws 
IgniteCheckedException, IOException {
-        if (!(msg instanceof Message)) {
-            out.write(JAVA_SERIALIZATION);
-
-            U.marshal(spi.marshaller(), msg, out);
-
-            return;
-        }
-
         try {
-            out.write(MESSAGE_SERIALIZATION);
-
             serializeMessage((Message)msg, out);
 
             out.flush();
@@ -162,21 +140,23 @@ public class TcpDiscoveryIoSession {
      * @throws IgniteCheckedException If deserialization fails.
      */
     <T> T readMessage() throws IgniteCheckedException, IOException {
-        byte serMode = (byte)in.read();
+        try {
+            byte b0 = (byte)in.read();
+            byte b1 = (byte)in.read();
 
-        if (JAVA_SERIALIZATION == serMode)
-            return U.unmarshal(spi.marshaller(), in, clsLdr);
+            short msgType = makeMessageType(b0, b1);
 
-        try {
-            if (MESSAGE_SERIALIZATION != serMode) {
-                detectSslAlert(serMode, in);
+            Message msg;
 
-                // IOException type is important for ServerImpl. It may search 
the cause (X.hasCause).
-                // The connection error processing behavior depends on it.
-                throw new IOException("Received unexpected byte while reading 
discovery message: " + serMode);
+            try {
+                msg = spi.messageFactory().create(msgType);
             }
+            catch (IgniteException e) {
+                detectSslAlert(b0, b1, in);
 
-            Message msg = 
spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read()));
+                // 'Invalid message type' should not be lost.
+                throw e;
+            }
 
             msgReader.reset();
             msgReader.setBuffer(msgBuf);
@@ -273,12 +253,13 @@ public class TcpDiscoveryIoSession {
      * See handling {@code StreamCorruptedException} in {@link #readMessage()}.
      * Keeps logic similar to {@link 
java.io.ObjectInputStream#readStreamHeader}.
      */
-    private void detectSslAlert(byte firstByte, InputStream in) throws 
IOException {
+    private void detectSslAlert(byte b0, byte b1, InputStream in) throws 
IOException {
         byte[] hdr = new byte[4];
-        hdr[0] = firstByte;
-        int read = in.readNBytes(hdr, 1, 3);
+        hdr[0] = b0;
+        hdr[1] = b1;
+        int read = in.readNBytes(hdr, 2, 2);
 
-        if (read < 3)
+        if (read < 2)
             throw new EOFException();
 
         String hex = String.format("%02x%02x%02x%02x", hdr[0], hdr[1], hdr[2], 
hdr[3]);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
index 1c9e1bcc69c..8c871f9e2eb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
@@ -22,7 +22,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -59,9 +58,6 @@ class TcpDiscoveryMessageSerializer extends 
TcpDiscoveryIoSession {
      * @throws IOException If serialization fails.
      */
     byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws 
IgniteCheckedException, IOException {
-        if (!(msg instanceof Message))
-            return U.marshal(spi.marshaller(), msg);
-
         try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
             serializeMessage((Message)msg, out);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 2fa700d9c7f..903524c8aa9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -71,7 +71,6 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
@@ -1705,13 +1704,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
         try (SocketTimeoutObject ignored = startTimer(sock, timeout)) {
             OutputStream out = sock.getOutputStream();
 
-            // Write Ignite header without leading byte.
-            if (msg != null) {
-                byte mode = msg instanceof Message ? 
TcpDiscoveryIoSession.MESSAGE_SERIALIZATION : 
TcpDiscoveryIoSession.JAVA_SERIALIZATION;
-
-                out.write(mode);
-            }
-
             out.write(data);
 
             out.flush();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index e4ed7cafb8b..09784f49661 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -19,34 +19,21 @@ package org.apache.ignite.spi.discovery.tcp.messages;
 
 import java.util.Objects;
 import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * Wrapper for custom message.
+ * Wrapper for {@link DiscoveryCustomMessage}.
  */
 @TcpDiscoveryRedirectToClient
 @TcpDiscoveryEnsureDelivery
 public class TcpDiscoveryCustomEventMessage extends 
TcpDiscoveryAbstractTraceableMessage {
     /** */
-    private volatile DiscoverySpiCustomMessage msg;
-
-    /** Serialized message bytes. */
-    // TODO: Should be removed in 
https://issues.apache.org/jira/browse/IGNITE-27627
     @Order(0)
-    volatile @Nullable byte[] msgBytes;
-
-    /** {@link Message} representation of original message. */
-    // TODO: Should be removed in 
https://issues.apache.org/jira/browse/IGNITE-27627
-    @Order(1)
-    volatile @Nullable Message serMsg;
+    DiscoverySpiCustomMessage msg;
 
     /**
      * Constructor for {@link MessageFactory}.
@@ -72,18 +59,9 @@ public class TcpDiscoveryCustomEventMessage extends 
TcpDiscoveryAbstractTraceabl
     public TcpDiscoveryCustomEventMessage(TcpDiscoveryCustomEventMessage msg) {
         super(msg);
 
-        msgBytes = msg.msgBytes;
-        serMsg = msg.serMsg;
         this.msg = msg.msg;
     }
 
-    /**
-     * Clear deserialized form of wrapped message.
-     */
-    public void clearMessage() {
-        msg = null;
-    }
-
     /**
      * @return Original message.
      */
@@ -91,49 +69,11 @@ public class TcpDiscoveryCustomEventMessage extends 
TcpDiscoveryAbstractTraceabl
         return msg;
     }
 
-    /**
-     * Prepare message for serialization.
-     *
-     * @param marsh Marshaller.
-     */
-    // TODO: Should be removed in 
https://issues.apache.org/jira/browse/IGNITE-27627
-    @Override public void prepareMarshal(Marshaller marsh) throws 
IgniteCheckedException {
-        super.prepareMarshal(marsh);
-
-        if (msg instanceof Message)
-            serMsg = (Message)msg;
-        else {
-            if (msg != null)
-                msgBytes = U.marshal(marsh, msg);
-        }
-    }
-
-    /**
-     * Finish deserialization.
-     *
-     * @param marsh Marshaller.
-     * @param ldr Class loader.
-     */
-    // TODO: Should be removed in 
https://issues.apache.org/jira/browse/IGNITE-27627
-    @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) 
throws IgniteCheckedException {
-        super.finishUnmarshal(marsh, ldr);
-
-        if (msg != null)
-            return;
-
-        if (serMsg != null)
-            msg = (DiscoverySpiCustomMessage)serMsg;
-        else {
-            if (msgBytes != null)
-                msg = U.unmarshal(marsh, msgBytes, ldr);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public boolean equals(Object obj) {
         return super.equals(obj) &&
             obj instanceof TcpDiscoveryCustomEventMessage &&
-            
Objects.equals(((TcpDiscoveryCustomEventMessage)obj).verifierNodeId(), 
verifierNodeId());
+            
Objects.equals(((TcpDiscoveryAbstractMessage)obj).verifierNodeId(), 
verifierNodeId());
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
index a900dc423b2..8619509d5cd 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
@@ -53,7 +53,6 @@ import org.junit.Test;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
 import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
-import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
@@ -185,7 +184,7 @@ public class NodeSecurityContextPropagationTest extends 
GridCommonAbstractTest {
             Object unwrappedMsg = msg;
 
             if (msg instanceof TcpDiscoveryCustomEventMessage) {
-                DiscoverySpiCustomMessage customMsg = getFieldValue(msg, 
"serMsg");
+                DiscoverySpiCustomMessage customMsg = 
((TcpDiscoveryCustomEventMessage)msg).message();
 
                 assert customMsg instanceof SecurityAwareCustomMessageWrapper;
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java 
b/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java
index bda1aebcd22..865d9ba1644 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java
@@ -26,9 +26,10 @@ import org.apache.ignite.plugin.ExtensionRegistry;
 import org.apache.ignite.plugin.PluginContext;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
 import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
 
+import static org.apache.ignite.testframework.GridTestUtils.loadSerializer;
+
 /**
  * Plugin provider for registering test messages in the communication and 
discovery protocols.
  */
@@ -52,7 +53,7 @@ public class MessagesPluginProvider extends 
AbstractTestPluginProvider {
                     }
                 };
 
-                f.register(directType, msgSupp, loadSerializer(msg));
+                f.register(directType, msgSupp, loadSerializer(msg, null, 
null));
 
                 directType++;
             }
@@ -75,19 +76,6 @@ public class MessagesPluginProvider extends 
AbstractTestPluginProvider {
         // Register messages into the discovery protocol.
         TestTcpDiscoverySpi discoSpi = 
(TestTcpDiscoverySpi)ctx.igniteConfiguration().getDiscoverySpi();
 
-        discoSpi.messageFactory(msgFactoryProvider);
-    }
-
-    /** */
-    private MessageSerializer<? extends Message> loadSerializer(Class<? 
extends Message> msgCls) {
-        try {
-            Class<?> serCls = U.gridClassLoader()
-                .loadClass(msgCls.getPackage().getName() + "." + 
msgCls.getSimpleName() + "Serializer");
-
-            return (MessageSerializer<? extends Message>)U.newInstance(serCls);
-        }
-        catch (Exception e) {
-            throw new RuntimeException("Unable to find serializer for message: 
" + msgCls, e);
-        }
+        discoSpi.messageFactory(msgFactoryProvider, ctx.igniteConfiguration());
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
index b8a82d52cc9..830e9aa27e4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
@@ -24,14 +24,23 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.Socket;
+import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.CoreMessagesProvider;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import 
org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshallers;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -39,18 +48,19 @@ import org.junit.Test;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_OBJECT_INPUT_FILTER_AUTOCONFIGURATION;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHALLER_BLACKLIST;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHALLER_WHITELIST;
+import static org.apache.ignite.testframework.GridTestUtils.loadSerializer;
 
 /**
  * Tests for whitelist and blacklist ot avoiding deserialization vulnerability.
  */
 @WithSystemProperty(key = IGNITE_ENABLE_OBJECT_INPUT_FILTER_AUTOCONFIGURATION, 
value = "false")
 public class DiscoveryUnmarshalVulnerabilityTest extends 
GridCommonAbstractTest {
-    /** Marshaller. */
-    private static final JdkMarshaller MARSH = Marshallers.jdk();
-
     /** Shared value. */
     private static final AtomicBoolean SHARED = new AtomicBoolean();
 
+    /** */
+    private LogListener lsnr;
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
@@ -63,6 +73,33 @@ public class DiscoveryUnmarshalVulnerabilityTest extends 
GridCommonAbstractTest
         IgniteUtils.clearClassCache();
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        MessageFactoryProvider msgFactoryProvider = new 
AbstractMarshallableMessageFactoryProvider() {
+            @Override public void registerAll(MessageFactory factory) {
+                factory.register(
+                    CoreMessagesProvider.MAX_MESSAGE_ID + 1,
+                    ExploitMessage::new,
+                    new MessageSerializerWrapper(this));
+            }
+        };
+
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
+        discoSpi.messageFactory(msgFactoryProvider, cfg);
+
+        lsnr = LogListener.matches("Invalid message type").build();
+
+        return cfg.setDiscoverySpi(discoSpi)
+            .setGridLogger(new ListeningTestLogger(log, lsnr));
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -132,36 +169,33 @@ public class DiscoveryUnmarshalVulnerabilityTest extends 
GridCommonAbstractTest
         testExploit(false);
     }
 
+    /** */
+    @Test
+    public void testExploitDirectly() throws Exception {
+        startGrid();
+
+        attack(Marshallers.jdk().marshal(new Exploit()));
+
+        assertTrue("Invalid message type MUST occur", 
lsnr.check(getTestTimeout()));
+        assertFalse(SHARED.get());
+    }
+
     /**
      * @param positive Positive.
      */
     private void testExploit(boolean positive) throws Exception {
-        try {
-            startGrid();
+        startGrid();
 
-            attack(marshal(new Exploit()));
+        attack(serializedMessage());
 
-            boolean res = GridTestUtils.waitForCondition(new 
GridAbsPredicate() {
-                @Override public boolean apply() {
-                    return SHARED.get();
-                }
-            }, 3000L);
+        boolean res = GridTestUtils.waitForCondition(SHARED::get, 5000);
 
-            if (positive)
-                assertTrue(res);
-            else
-                assertFalse(res);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
+        if (positive)
+            assertTrue(res);
+        else
+            assertFalse(res);
 
-    /**
-     * @param obj Object.
-     */
-    private static byte[] marshal(Object obj) throws IgniteCheckedException {
-        return MARSH.marshal(obj);
+        assertFalse("Invalid message type ocurred", lsnr.check());
     }
 
     /**
@@ -175,13 +209,26 @@ public class DiscoveryUnmarshalVulnerabilityTest extends 
GridCommonAbstractTest
             OutputStream oos = new BufferedOutputStream(sock.getOutputStream())
         ) {
             oos.write(U.IGNITE_HEADER);
-            oos.write((byte)1); // Flag for java serialization.
             oos.write(data);
         }
     }
 
     /** */
-    private static class Exploit implements Serializable {
+    private byte[] serializedMessage() {
+        ByteBuffer buf = ByteBuffer.allocate(4096);
+
+        MessageFactory msgFactory = 
((TcpDiscoverySpi)grid().configuration().getDiscoverySpi()).messageFactory();
+
+        DirectMessageWriter writer = new DirectMessageWriter(msgFactory);
+        writer.setBuffer(buf);
+
+        writer.writeMessage(new ExploitMessage(new Exploit()));
+
+        return buf.flip().compact().array();
+    }
+
+    /** */
+    static class Exploit implements Serializable {
         /**
          * @param is Input stream.
          */
@@ -189,4 +236,43 @@ public class DiscoveryUnmarshalVulnerabilityTest extends 
GridCommonAbstractTest
             SHARED.set(true);
         }
     }
+
+    /** */
+    private static class MessageSerializerWrapper implements 
MessageSerializer<ExploitMessage> {
+        /** */
+        private final AbstractMarshallableMessageFactoryProvider provider;
+
+        /** */
+        private final AtomicBoolean init = new AtomicBoolean(true);
+
+        /** */
+        private MessageSerializer<ExploitMessage> serde;
+
+        /** */
+        private 
MessageSerializerWrapper(AbstractMarshallableMessageFactoryProvider provider) {
+            this.provider = provider;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean writeTo(ExploitMessage msg, MessageWriter 
writer) {
+            initIfNecessary();
+
+            return serde.writeTo(msg, writer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean readFrom(ExploitMessage msg, MessageReader 
reader) {
+            initIfNecessary();
+
+            return serde.readFrom(msg, reader);
+        }
+
+        /** */
+        private void initIfNecessary() {
+            if (init.get() && init.compareAndSet(true, false))
+                serde = loadSerializer(ExploitMessage.class,
+                    U.field(provider, "dfltMarsh"),
+                    U.field(provider, "resolvedClsLdr"));
+        }
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ExploitMessage.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ExploitMessage.java
new file mode 100644
index 00000000000..182f91f8262
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ExploitMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.marshaller.Marshaller;
+import 
org.apache.ignite.spi.discovery.tcp.DiscoveryUnmarshalVulnerabilityTest.Exploit;
+
+/** */
+class ExploitMessage implements MarshallableMessage {
+    /** */
+    @Order(0)
+    byte[] exploitBytes;
+
+    /** */
+    private Exploit exploit;
+
+    /** */
+    public ExploitMessage() {}
+
+    /** */
+    public ExploitMessage(Exploit exploit) {
+        this.exploit = exploit;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(Marshaller marsh) throws 
IgniteCheckedException {
+        exploitBytes = marsh.marshal(exploit);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(Marshaller marsh, ClassLoader 
clsLdr) throws IgniteCheckedException {
+        exploit = marsh.unmarshal(exploitBytes, clsLdr);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
index 2aaabee1aeb..cf6affb8b35 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp;
 import java.io.IOException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.CoreMessagesProvider;
 import 
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import 
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
@@ -55,6 +56,9 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi 
implements IgniteDiscov
     /** */
     private MessageFactory msgFactory;
 
+    /** */
+    private MessageFactoryProvider provider;
+
     /** {@inheritDoc} */
     @Override protected void writeMessage(TcpDiscoveryIoSession ses, 
TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
         IgniteCheckedException {
@@ -116,16 +120,23 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi 
implements IgniteDiscov
      * Otherwise, this method call will take no effect.
      *
      * @param msgFactoryProvider Discovery messages factory provider.
+     * @param cfg Ignite configuration.
      */
-    public void messageFactory(MessageFactoryProvider msgFactoryProvider) {
+    public void messageFactory(MessageFactoryProvider msgFactoryProvider, 
IgniteConfiguration cfg) {
+        provider = msgFactoryProvider;
         assert !started();
 
-        this.msgFactory = new IgniteMessageFactoryImpl(new 
MessageFactoryProvider[] {
-            new CoreMessagesProvider(jdk(), jdk(), 
U.resolveClassLoader(ignite().configuration())),
+        msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[] 
{
+            new CoreMessagesProvider(jdk(), jdk(), U.resolveClassLoader(cfg)),
             msgFactoryProvider
         });
     }
 
+    /** {@inheritDoc} */
+    @Override public MessageFactoryProvider messageFactoryProvider() {
+        return provider;
+    }
+
     /** {@inheritDoc} */
     @Override protected void initLocalNode(int srvPort, boolean 
addExtAddrAttr) {
         if (msgFactory != null)
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 373ac47f9f4..5896f7df677 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -94,6 +94,7 @@ import 
org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.MarshallableMessage;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -121,7 +122,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
 import org.apache.ignite.spi.discovery.DiscoveryNotification;
@@ -138,6 +141,7 @@ import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_HOME;
 import static 
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
 import static 
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.partitionFileName;
 import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.nodeIds;
+import static org.apache.ignite.marshaller.Marshallers.jdk;
 import static org.apache.ignite.ssl.SslContextFactory.DFLT_KEY_ALGORITHM;
 import static org.apache.ignite.ssl.SslContextFactory.DFLT_SSL_PROTOCOL;
 import static org.apache.ignite.ssl.SslContextFactory.DFLT_STORE_TYPE;
@@ -2628,4 +2632,30 @@ public final class GridTestUtils {
 
         setFieldValue(nioSrvr, "skipRead", skip);
     }
+
+    /** */
+    public static <T extends Message> MessageSerializer<T> 
loadSerializer(Class<? extends Message> msgCls,
+        @Nullable Marshaller dfltMarsh, @Nullable ClassLoader dfltClsLdr) {
+        try {
+            boolean isMarshallable = 
MarshallableMessage.class.isAssignableFrom(msgCls);
+
+            String clsPref = msgCls.getSimpleName() + (isMarshallable ? 
"Marshallable" : "");
+
+            Class<?> serCls = U.gridClassLoader()
+                .loadClass(msgCls.getPackage().getName() + "." + clsPref + 
"Serializer");
+
+            Marshaller marsh = dfltMarsh != null ? dfltMarsh : jdk();
+            ClassLoader cldLdr = dfltClsLdr != null ? dfltClsLdr : 
U.gridClassLoader();
+
+            Object msgSer = isMarshallable ?
+                serCls.getConstructor(Marshaller.class, ClassLoader.class)
+                     .newInstance(marsh, cldLdr) :
+                U.newInstance(serCls);
+
+            return (MessageSerializer<T>)msgSer;
+        }
+        catch (Exception e) {
+            throw new RuntimeException("Unable to find serializer for message: 
" + msgCls, e);
+        }
+    }
 }

Reply via email to