This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fbe2e45748b IGNITE-28627 Handle unknown messages in discovery (#13094)
fbe2e45748b is described below
commit fbe2e45748b92db61df1a8b01d312dc5a4abcda8
Author: Nikolay <[email protected]>
AuthorDate: Thu May 7 12:10:20 2026 +0300
IGNITE-28627 Handle unknown messages in discovery (#13094)
---
.../communication/IgniteMessageFactoryImpl.java | 2 +-
.../communication/UnknownMessageException.java | 49 +++++++
.../plugin/extensions/communication/Message.java | 3 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 2 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 19 ++-
.../spi/discovery/tcp/TcpDiscoveryIoSession.java | 9 ++
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 18 +++
.../tcp/DiscoveryDeserializationExceptionTest.java | 145 +++++++++++++++++++++
.../tcp/DiscoverySerializationExceptionTest.java | 115 ++++++++++++++++
.../spi/discovery/tcp/NotRegisteredMessage.java | 47 +++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 5 +
11 files changed, 407 insertions(+), 7 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
index e56365af4b4..5388cefc2c1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
@@ -147,7 +147,7 @@ public class IgniteMessageFactoryImpl implements
MessageFactory {
Supplier<Message> supplier =
msgSuppliers[directTypeToIndex(directType)];
if (supplier == null)
- throw new IgniteException("Invalid message type: " + directType);
+ throw new UnknownMessageException(directType);
return supplier.get();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java
new file mode 100644
index 00000000000..bd2aebbcb91
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/UnknownMessageException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/**
+ * Exception to be thrown when unregistered class serialized or unknown
message deserialized.
+ */
+public class UnknownMessageException extends IgniteException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static final String NO_REG_MSG = "No registration for class: %s";
+
+ /** */
+ public static final String INVALID_TYPE_MSG = "Invalid message type: %d";
+
+ /**
+ * @param directType Unknown direct type.
+ */
+ public UnknownMessageException(short directType) {
+ super(String.format(INVALID_TYPE_MSG, directType));
+ }
+
+ /**
+ * @param clazz Unregistered class.
+ */
+ public UnknownMessageException(Class<? extends Message> clazz) {
+ super(String.format(NO_REG_MSG, clazz.getSimpleName()));
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java
index ee48f1abdc0..5dae2d9afd3 100644
---
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java
+++
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteException;
+import
org.apache.ignite.internal.managers.communication.UnknownMessageException;
/**
* Base class for all communication messages.
@@ -68,7 +69,7 @@ public interface Message {
Short type = REGISTRATIONS.get(clazz);
if (type == null)
- throw new IgniteException("No registration for class " +
clazz.getSimpleName());
+ throw new UnknownMessageException(clazz);
return type;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 6403805672b..0d424a7a9dc 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -731,7 +731,7 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.writeMessage(ses, req,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
- TcpDiscoveryHandshakeResponse res = spi.readMessage(ses,
ackTimeout0);
+ TcpDiscoveryHandshakeResponse res =
spi.readHandshakeResponse(ses, ackTimeout0);
// Convert the addresses once.
Collection<InetSocketAddress> redirectAddrs =
res.redirectAddresses();
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 5193785a271..c0a9e39ac5a 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -78,6 +78,7 @@ import
org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import
org.apache.ignite.internal.managers.communication.UnknownMessageException;
import
org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
@@ -1488,7 +1489,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Handshake.
spi.writeMessage(ses, req,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
- TcpDiscoveryHandshakeResponse res = spi.readMessage(ses,
timeoutHelper.nextTimeoutChunk(ackTimeout0));
+ TcpDiscoveryHandshakeResponse res =
spi.readHandshakeResponse(ses, timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (msg instanceof TcpDiscoveryJoinRequestMessage) {
boolean ignore = false;
@@ -3462,7 +3463,8 @@ class ServerImpl extends TcpDiscoveryImpl {
timeoutHelper.nextTimeoutChunk(ackTimeout0));
}
- TcpDiscoveryHandshakeResponse res =
spi.readMessage(ses, timeoutHelper.nextTimeoutChunk(ackTimeout0));
+ TcpDiscoveryHandshakeResponse res =
+ spi.readHandshakeResponse(ses,
timeoutHelper.nextTimeoutChunk(ackTimeout0));
if (log.isDebugEnabled())
log.debug("Handshake response: " + res);
@@ -7185,8 +7187,10 @@ class ServerImpl extends TcpDiscoveryImpl {
", err=" + X.cause(e,
ClassNotFoundException.class).getMessage() + ']');
// Always report marshalling errors.
- boolean err = e.hasCause(ObjectStreamException.class)
||
- (nodeAlive(nodeId) && spiStateCopy() == CONNECTED
&& !X.hasCause(e, IOException.class));
+ // Can receive unknown message on handshake. It's ok -
must continue to try find proper port.
+ boolean err = e.hasCause(ObjectStreamException.class)
+ || e.hasCause(UnknownMessageException.class)
+ || (nodeAlive(nodeId) && spiStateCopy() ==
CONNECTED && !X.hasCause(e, IOException.class));
if (err)
LT.error(log, e, "Failed to read message [sock=" +
sock + ", locNodeId=" + locNodeId +
@@ -7220,6 +7224,13 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
}
+ catch (UnknownMessageException e) {
+ if (spi.ignite() instanceof IgniteEx) {
+ FailureProcessor failure =
((IgniteEx)spi.ignite()).context().failure();
+
+ failure.process(new
FailureContext(SYSTEM_WORKER_TERMINATION, e));
+ }
+ }
finally {
if (clientMsgWrk != null) {
if (log.isDebugEnabled())
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
index 6be18c60c40..7b5ccd46ec5 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -34,6 +34,8 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
+import
org.apache.ignite.internal.managers.communication.UnknownMessageException;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -124,6 +126,10 @@ public class TcpDiscoveryIoSession {
out.flush();
}
catch (Exception e) {
+ // See Message#directType()
+ if (X.hasCause(e, UnknownMessageException.class))
+ throw e;
+
// Keep logic similar to `U.marshal(...)`.
if (e instanceof IgniteCheckedException)
throw (IgniteCheckedException)e;
@@ -194,6 +200,9 @@ public class TcpDiscoveryIoSession {
return (T)msg;
}
catch (Exception e) {
+ if (e instanceof UnknownMessageException)
+ throw e;
+
// Keep logic similar to `U.marshal(...)`.
if (e instanceof IgniteCheckedException)
throw (IgniteCheckedException)e;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 9e1b3c60cdc..2ae75539a1c 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -56,6 +56,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import
org.apache.ignite.internal.managers.communication.UnknownMessageException;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
@@ -108,6 +109,7 @@ import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessag
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryEnsureDelivery;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -2436,6 +2438,22 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
return marsh;
}
+ /**
+ * On handshake it's OK to receive unknown message.
+ * Wrapping in {@link IgniteCheckedException} allows caller to handle case
properly.
+ */
+ TcpDiscoveryHandshakeResponse readHandshakeResponse(
+ TcpDiscoveryIoSession ses,
+ long timeout
+ ) throws IOException, IgniteCheckedException {
+ try {
+ return readMessage(ses, timeout);
+ }
+ catch (UnknownMessageException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
/** {@inheritDoc} */
@Override public TcpDiscoverySpi setName(String name) {
super.setName(name);
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryDeserializationExceptionTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryDeserializationExceptionTest.java
new file mode 100644
index 00000000000..cfe047d58e7
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryDeserializationExceptionTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.AbstractFailureHandler;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.PluginContext;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE;
+import static
org.apache.ignite.internal.managers.communication.UnknownMessageException.INVALID_TYPE_MSG;
+
+/** */
+public class DiscoveryDeserializationExceptionTest extends
GridCommonAbstractTest {
+ /** */
+ private static final int MSG_DIRECT_TYPE = -32764;
+
+ /** */
+ private static final String ERR_MSG = String.format(INVALID_TYPE_MSG,
MSG_DIRECT_TYPE);
+
+ /** */
+ private ListeningTestLogger lsnrLog;
+
+ /** */
+ private volatile int failNodeIdx;
+
+ /** */
+ private volatile CountDownLatch failureHandlerLatch;
+
+ /** */
+ private final LogListener errLsnr = LogListener.matches(ERR_MSG).build();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ if (getTestIgniteInstanceName(failNodeIdx).equals(igniteInstanceName))
{
+ cfg.setGridLogger(lsnrLog).setFailureHandler(new
AbstractFailureHandler() {
+ @Override protected boolean handle(Ignite ignite,
FailureContext fctx) {
+ assertEquals(FailureType.SYSTEM_WORKER_TERMINATION,
fctx.type());
+ assertEquals(getTestIgniteInstanceName(failNodeIdx),
ignite.configuration().getIgniteInstanceName());
+
+ assertNotNull(fctx.error());
+ assertEquals(ERR_MSG, fctx.error().getMessage());
+
+ failureHandlerLatch.countDown();
+
+ return true;
+ }
+ });
+ }
+ else
+ cfg.setPluginProviders(new NotRegisteredMessageProvider());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ lsnrLog = new ListeningTestLogger(log);
+
+ lsnrLog.registerListener(errLsnr);
+ }
+
+ /** */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_THREADS_ON_FAILURE, value = "fasle")
+ public void testReadExceptionLogged() throws Exception {
+ failureHandlerLatch = new CountDownLatch(1);
+ failNodeIdx = 1;
+
+ startGrids(2);
+
+ // grid0 knows about NotRegisteredMessage.
+ // Expect grid1 fail to read it.
+ grid(0).context().discovery().sendCustomEvent(new
NotRegisteredMessage(""));
+
+ assertTrue("Failure handler must be invoked",
failureHandlerLatch.await(1, TimeUnit.MINUTES));
+ assertTrue("Error must be logged", errLsnr.check(30_000));
+ assertTrue(grid(failNodeIdx).context().invalid());
+ }
+
+ /** */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_THREADS_ON_FAILURE, value = "fasle")
+ public void testReadExceptionLoggedOnClient() throws Exception {
+ failureHandlerLatch = new CountDownLatch(1);
+ failNodeIdx = 3;
+
+ startGrids(2);
+
+ IgniteEx cli = startClientGrid(failNodeIdx);
+
+ // grid0 knows about NotRegisteredMessage.
+ // Expect client node fail to read it.
+ grid(0).context().discovery().sendCustomEvent(new
NotRegisteredMessage(""));
+
+ assertTrue("Error must be logged", errLsnr.check(30_000));
+ }
+
+ /** */
+ public static class NotRegisteredMessageProvider extends
AbstractTestPluginProvider {
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return getClass().getName();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initExtensions(PluginContext ctx,
ExtensionRegistry registry) {
+ registry.registerExtension(MessageFactoryProvider.class, (factory)
->
+ factory.register(MSG_DIRECT_TYPE, NotRegisteredMessage::new,
new NotRegisteredMessageSerializer())
+ );
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoverySerializationExceptionTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoverySerializationExceptionTest.java
new file mode 100644
index 00000000000..8ced1156e6c
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoverySerializationExceptionTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.failure.NoOpFailureHandler;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_THREADS_ON_FAILURE;
+import static
org.apache.ignite.internal.managers.communication.UnknownMessageException.NO_REG_MSG;
+
+/** */
+public class DiscoverySerializationExceptionTest extends
GridCommonAbstractTest {
+ /** */
+ private static final String ERR_MSG = String.format(NO_REG_MSG,
NotRegisteredMessage.class.getSimpleName());
+
+ /** */
+ private ListeningTestLogger lsnrLog;
+
+ /** */
+ private volatile int failNodeIdx;
+
+ /** */
+ private volatile CountDownLatch failureHandlerLatch;
+
+ /** */
+ private final LogListener errLsnr = LogListener.matches(ERR_MSG).build();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ // Listening only expected node log.
+ if (getTestIgniteInstanceName(failNodeIdx).equals(igniteInstanceName))
{
+ cfg.setGridLogger(lsnrLog)
+ .setFailureHandler(new NoOpFailureHandler() {
+ @Override protected boolean handle(Ignite ignite,
FailureContext fctx) {
+ assertEquals(FailureType.SYSTEM_WORKER_TERMINATION,
fctx.type());
+ assertEquals(getTestIgniteInstanceName(failNodeIdx),
ignite.configuration().getIgniteInstanceName());
+
+ assertNotNull(fctx.error());
+ assertEquals(ERR_MSG, fctx.error().getMessage());
+
+ failureHandlerLatch.countDown();
+
+ return true;
+ }
+ });
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ lsnrLog = new ListeningTestLogger(log);
+
+ lsnrLog.registerListener(errLsnr);
+ }
+
+ /** */
+ @Test
+ @WithSystemProperty(key = IGNITE_DUMP_THREADS_ON_FAILURE, value = "fasle")
+ public void testWriteExceptionLogged() throws Exception {
+ failNodeIdx = 1;
+ failureHandlerLatch = new CountDownLatch(1);
+
+ startGrids(2);
+
+ grid(failNodeIdx).context().discovery().sendCustomEvent(new
NotRegisteredMessage(""));
+
+ assertTrue("Failure handler must be invoked",
failureHandlerLatch.await(1, TimeUnit.MINUTES));
+ assertTrue("Error must be logged", errLsnr.check(30_000));
+ assertTrue(grid(failNodeIdx).context().invalid());
+ }
+
+ /** */
+ @Test
+ public void testWriteExceptionLoggedOnClient() throws Exception {
+ failNodeIdx = 3;
+
+ startGrids(2);
+
+ startClientGrid(failNodeIdx).context().discovery().sendCustomEvent(new
NotRegisteredMessage(""));
+
+ // Currently, client node doesn't call failure handler.
+ assertTrue("Error must be logged", errLsnr.check(30_000));
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/NotRegisteredMessage.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/NotRegisteredMessage.java
new file mode 100644
index 00000000000..e595725efd9
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/NotRegisteredMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+
+/** */
+public class NotRegisteredMessage extends DiscoveryCustomMessage {
+ /** */
+ @Order(0)
+ String str;
+
+ /** */
+ public NotRegisteredMessage() {
+ // No-op.
+ }
+
+ /** */
+ public NotRegisteredMessage(String str) {
+ super(IgniteUuid.randomUuid());
+
+ this.str = str;
+ }
+
+ /** {@inheritDoc} */
+ @Override public DiscoverySpiCustomMessage ackMessage() {
+ return null;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 3dcab1b14f8..fa55157bc17 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -29,6 +29,8 @@ import
org.apache.ignite.spi.discovery.LongClientConnectToClusterTest;
import
org.apache.ignite.spi.discovery.datacenter.MultiDataCenterClientRoutingTest;
import
org.apache.ignite.spi.discovery.datacenter.MultiDataCenterDeploymentTest;
import org.apache.ignite.spi.discovery.tcp.DiscoveryClientSocketTest;
+import
org.apache.ignite.spi.discovery.tcp.DiscoveryDeserializationExceptionTest;
+import org.apache.ignite.spi.discovery.tcp.DiscoverySerializationExceptionTest;
import org.apache.ignite.spi.discovery.tcp.DiscoveryUnmarshalVulnerabilityTest;
import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectSslTest;
import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest;
@@ -192,6 +194,9 @@ import static
org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
TcpDiscoveryDeadNodeAddressResolvingTest.class,
+ DiscoverySerializationExceptionTest.class,
+ DiscoveryDeserializationExceptionTest.class,
+
// MDC.
TcpDiscoveryMdcSelfTest.class,
TcpDiscoveryPendingMessageDeliveryMdcTest.class,