This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ce13cd9cccd IGNITE-27848 Use MessageSerializer for
TcpDiscoveryCustomEventMessage (#12742)
ce13cd9cccd is described below
commit ce13cd9cccdf99faeff18e6d67b479bd6008f239
Author: Ilya Shishkov <[email protected]>
AuthorDate: Thu Feb 26 15:52:42 2026 +0300
IGNITE-27848 Use MessageSerializer for TcpDiscoveryCustomEventMessage
(#12742)
---
.../discovery/DiscoveryMessageFactory.java | 7 +++
.../ignite/spi/discovery/tcp/ClientImpl.java | 13 ++--
.../ignite/spi/discovery/tcp/ServerImpl.java | 27 ++++----
.../messages/TcpDiscoveryCustomEventMessage.java | 72 +++++++++++++---------
.../TcpDiscoveryServerOnlyCustomEventMessage.java | 18 ++++--
.../cache/ClientSlowDiscoveryAbstractTest.java | 11 +---
...teMarshallerCacheClientRequestsMappingTest.java | 14 +++--
.../db/IgniteSequentialNodeCrashRecoveryTest.java | 4 +-
.../IncrementalSnapshotJoiningClientTest.java | 6 +-
.../query/schema/IndexWithSameNameTestBase.java | 5 +-
.../spi/discovery/tcp/BlockTcpDiscoverySpi.java | 8 +--
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 5 +-
12 files changed, 113 insertions(+), 77 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index a225c1abec7..707feeac700 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -45,6 +45,8 @@ import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRespon
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponseSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessageSerializer;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessageSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
@@ -75,6 +77,8 @@ import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponseSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessageSerializer;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessageSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessageSerializer;
@@ -113,5 +117,8 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new
TcpDiscoveryStatusCheckMessageSerializer());
factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new,
new TcpDiscoveryNodeAddFinishedMessageSerializer());
factory.register((short)20, TcpDiscoveryJoinRequestMessage::new, new
TcpDiscoveryJoinRequestMessageSerializer());
+ factory.register((short)21, TcpDiscoveryCustomEventMessage::new, new
TcpDiscoveryCustomEventMessageSerializer());
+ factory.register((short)22,
TcpDiscoveryServerOnlyCustomEventMessage::new,
+ new TcpDiscoveryServerOnlyCustomEventMessageSerializer());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 77d6926d76c..be87617d45b 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -506,11 +506,9 @@ class ClientImpl extends TcpDiscoveryImpl {
DiscoveryCustomMessage customMsg = U.unwrapCustomMessage(evt);
if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
- msg = new
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt,
- U.marshal(spi.marshaller(), evt));
+ msg = new
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
else
- msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
- U.marshal(spi.marshaller(), evt));
+ msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(),
evt);
Span rootSpan =
tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () ->
getLocalNodeId().toString())
@@ -522,6 +520,8 @@ class ClientImpl extends TcpDiscoveryImpl {
// This root span will be parent both from local and remote nodes.
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
+ msg.prepareMarshal(spi.marshaller());
+
sockWriter.sendMessage(msg);
rootSpan.addLog(() -> "Sent").end();
@@ -2600,8 +2600,9 @@ class ClientImpl extends TcpDiscoveryImpl {
if (node != null && node.visible()) {
try {
- DiscoveryCustomMessage msgObj =
msg.message(spi.marshaller(),
-
U.resolveClassLoader(spi.ignite().configuration()));
+ msg.finishUnmarhal(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
+
+ DiscoveryCustomMessage msgObj = msg.message();
notifyDiscovery(
EVT_DISCOVERY_CUSTOM_EVT, topVer, node,
allVisibleNodes(), msgObj, msg.spanContainer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2e2ef9ef122..01a3733da98 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1024,11 +1024,9 @@ class ServerImpl extends TcpDiscoveryImpl {
DiscoveryCustomMessage customMsg = U.unwrapCustomMessage(evt);
if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
- msg = new
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt,
- U.marshal(spi.marshaller(), evt));
+ msg = new
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
else
- msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
- U.marshal(spi.marshaller(), evt));
+ msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(),
evt);
Span rootSpan =
tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () ->
getLocalNodeId().toString())
@@ -1040,6 +1038,8 @@ class ServerImpl extends TcpDiscoveryImpl {
// This root span will be parent both from local and remote nodes.
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
+ msg.prepareMarshal(spi.marshaller());
+
msgWorker.addMessage(msg);
rootSpan.addLog(() -> "Sent").end();
@@ -6075,7 +6075,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- msg.message(null, msg.messageBytes());
+ msg.clearMessage();
}
else {
addMessage(new
TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
@@ -6083,7 +6083,9 @@ class ServerImpl extends TcpDiscoveryImpl {
DiscoveryCustomMessage msgObj = null;
try {
- msgObj = msg.message(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
+ msg.finishUnmarhal(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
+
+ msgObj = msg.message();
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom
message.", e);
@@ -6095,10 +6097,12 @@ class ServerImpl extends TcpDiscoveryImpl {
if (nextMsg != null) {
try {
TcpDiscoveryCustomEventMessage ackMsg = new
TcpDiscoveryCustomEventMessage(
- getLocalNodeId(), nextMsg,
U.marshal(spi.marshaller(), nextMsg));
+ getLocalNodeId(), nextMsg);
ackMsg.topologyVersion(msg.topologyVersion());
+ ackMsg.prepareMarshal(spi.marshaller());
+
processCustomMessage(ackMsg,
waitForNotification);
}
catch (IgniteCheckedException e) {
@@ -6129,8 +6133,7 @@ class ServerImpl extends TcpDiscoveryImpl {
notifyDiscoveryListener(msg, waitForNotification);
}
- // Clear msg field to prevent possible memory leak.
- msg.message(null, msg.messageBytes());
+ msg.clearMessage();
if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
@@ -6271,7 +6274,9 @@ class ServerImpl extends TcpDiscoveryImpl {
DiscoveryCustomMessage msgObj;
try {
- msgObj = msg.message(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
+ msg.finishUnmarhal(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
+
+ msgObj = msg.message();
}
catch (Throwable t) {
throw new IgniteException("Failed to unmarshal discovery
custom message: " + msg, t);
@@ -6303,7 +6308,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msgObj.isMutable()) {
try {
- msg.message(msgObj, U.marshal(spi.marshaller(),
msgObj));
+ msg.prepareMarshal(spi.marshaller());
}
catch (Throwable t) {
throw new IgniteException("Failed to marshal mutable
discovery message: " + msgObj, t);
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index 73ac084a3f1..980d12f1277 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -20,40 +20,48 @@ package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import
org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
- * Wrapped for custom message.
+ * Wrapper for custom message.
*/
@TcpDiscoveryRedirectToClient
@TcpDiscoveryEnsureDelivery
-public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceableMessage {
+public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceableMessage implements Message {
/** */
private static final long serialVersionUID = 0L;
/** */
- private transient volatile DiscoveryCustomMessage msg;
+ private volatile DiscoveryCustomMessage msg;
- /** */
- private byte[] msgBytes;
+ /** Serialized message bytes. */
+ // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27627
+ @Order(6)
+ volatile @Nullable byte[] msgBytes;
+
+ /**
+ * Constructor for {@link DiscoveryMessageFactory}.
+ */
+ public TcpDiscoveryCustomEventMessage() {
+ // No-op.
+ }
/**
* @param creatorNodeId Creator node id.
* @param msg Message.
- * @param msgBytes Serialized message.
*/
- public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable
DiscoveryCustomMessage msg,
- @NotNull byte[] msgBytes) {
+ public TcpDiscoveryCustomEventMessage(UUID creatorNodeId,
DiscoveryCustomMessage msg) {
super(creatorNodeId);
this.msg = msg;
- this.msgBytes = msgBytes;
}
/**
@@ -75,44 +83,47 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
}
/**
- * @return Serialized message.
+ * @return Original message.
*/
- public byte[] messageBytes() {
- return msgBytes;
+ public DiscoveryCustomMessage message() {
+ return msg;
}
/**
- * @param msg Message.
- * @param msgBytes Serialized message.
+ * Prepare message for serialization.
+ *
+ * @param marsh Marshaller.
*/
- public void message(@Nullable DiscoveryCustomMessage msg, @NotNull byte[]
msgBytes) {
- this.msg = msg;
- this.msgBytes = msgBytes;
+ // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27627
+ public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException
{
+ assert msgBytes == null || msg.isMutable() : "Message bytes are not
null for immutable message: msg =" + msg;
+
+ msgBytes = U.marshal(marsh, msg);
}
/**
+ * Finish deserialization.
+ *
* @param marsh Marshaller.
- * @param ldr Classloader.
- * @return Deserialized message,
- * @throws java.lang.Throwable if unmarshal failed.
+ * @param ldr Class loader.
*/
- @Nullable public DiscoveryCustomMessage message(@NotNull Marshaller marsh,
ClassLoader ldr) throws Throwable {
+ // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27627
+ public void finishUnmarhal(Marshaller marsh, ClassLoader ldr) throws
Throwable {
if (msg == null) {
try {
msg = U.unmarshal(marsh, msgBytes, ldr);
}
catch (IgniteCheckedException e) {
// Try to resurrect a message in a case of deserialization
failure
- if (e.getCause() instanceof IncompleteDeserializationException)
- return
((IncompleteDeserializationException)e.getCause()).message();
+ if (e.getCause() instanceof
IncompleteDeserializationException) {
+ msg =
((IncompleteDeserializationException)e.getCause()).message();
+
+ return;
+ }
throw e;
}
-
- assert msg != null;
}
-
- return msg;
}
/** {@inheritDoc} */
@@ -126,4 +137,9 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
@Override public String toString() {
return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super",
super.toString());
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 21;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
index 15122f1e50b..ece2d61f806 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java
@@ -29,13 +29,23 @@ public class TcpDiscoveryServerOnlyCustomEventMessage
extends TcpDiscoveryCustom
/** */
private static final long serialVersionUID = 0L;
+ /**
+ * Default constructor.
+ */
+ public TcpDiscoveryServerOnlyCustomEventMessage() {
+ // No-op.
+ }
+
/**
* @param creatorNodeId Creator node id.
* @param msg Message.
- * @param msgBytes Serialized message.
*/
- public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId,
@NotNull DiscoveryCustomMessage msg,
- @NotNull byte[] msgBytes) {
- super(creatorNodeId, msg, msgBytes);
+ public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId,
@NotNull DiscoveryCustomMessage msg) {
+ super(creatorNodeId, msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 22;
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
index cf1871b1523..7b437382a48 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java
@@ -94,22 +94,17 @@ public class ClientSlowDiscoveryAbstractTest extends
GridCommonAbstractTest {
TcpDiscoveryCustomEventMessage cm =
(TcpDiscoveryCustomEventMessage)msg;
- DiscoveryCustomMessage delegate;
-
try {
- DiscoveryCustomMessage custMsg = cm.message(marshaller(),
- U.resolveClassLoader(ignite().configuration()));
-
- assertNotNull(custMsg);
+ cm.finishUnmarhal(marshaller(),
U.resolveClassLoader(ignite().configuration()));
- delegate = U.unwrapCustomMessage(custMsg);
+ assertNotNull(cm.message());
}
catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
if (interceptor != null)
- interceptor.apply(delegate);
+ interceptor.apply(U.unwrapCustomMessage(cm.message()));
}
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java
index 388d883de1b..5dc0d6bf69e 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java
@@ -161,10 +161,11 @@ public class
IgniteMarshallerCacheClientRequestsMappingTest extends GridCommonAb
@Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (msg instanceof TcpDiscoveryCustomEventMessage) {
try {
- DiscoveryCustomMessage customMsg =
-
((TcpDiscoveryCustomEventMessage)msg).message(marshaller(),
U.gridClassLoader());
+ TcpDiscoveryCustomEventMessage evtMsg =
(TcpDiscoveryCustomEventMessage)msg;
- DiscoveryCustomMessage delegate =
U.unwrapCustomMessage(customMsg);
+ evtMsg.finishUnmarhal(marshaller(),
U.gridClassLoader());
+
+ DiscoveryCustomMessage delegate =
U.unwrapCustomMessage(evtMsg.message());
if (delegate instanceof MappingAcceptedMessage) {
MarshallerMappingItem item =
GridTestUtils.getFieldValue(delegate, "item");
@@ -236,10 +237,11 @@ public class
IgniteMarshallerCacheClientRequestsMappingTest extends GridCommonAb
@Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (msg instanceof TcpDiscoveryCustomEventMessage) {
try {
- DiscoveryCustomMessage customMsg =
-
((TcpDiscoveryCustomEventMessage)msg).message(marshaller(),
U.gridClassLoader());
+ TcpDiscoveryCustomEventMessage evtMsg =
(TcpDiscoveryCustomEventMessage)msg;
+
+ evtMsg.finishUnmarhal(marshaller(),
U.gridClassLoader());
- DiscoveryCustomMessage delegate =
U.unwrapCustomMessage(customMsg);
+ DiscoveryCustomMessage delegate =
U.unwrapCustomMessage(evtMsg.message());
if (delegate instanceof MappingProposedMessage) {
MarshallerMappingItem item =
GridTestUtils.getFieldValue(delegate, "mappingItem");
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
index 724e2e10678..8c3785768c5 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java
@@ -358,7 +358,9 @@ public class IgniteSequentialNodeCrashRecoveryTest extends
GridCommonAbstractTes
DiscoveryCustomMessage msgObj = null;
try {
- msgObj = msg.message(marshaller(),
U.resolveClassLoader(ignite().configuration()));
+ msg.finishUnmarhal(marshaller(), U.gridClassLoader());
+
+ msgObj = msg.message();
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.",
e);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
index 3de54f1f3ba..6e5f536f9de 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java
@@ -29,7 +29,6 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import
org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotFinishRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
@@ -247,10 +246,9 @@ public class IncrementalSnapshotJoiningClientTest extends
AbstractIncrementalSna
TcpDiscoveryCustomEventMessage m =
(TcpDiscoveryCustomEventMessage)msg;
try {
- DiscoveryCustomMessage m0 = m.message(
- marshaller(),
U.resolveClassLoader(ignite().configuration()));
+ m.finishUnmarhal(marshaller(),
U.resolveClassLoader(ignite().configuration()));
- if (U.unwrapCustomMessage(m0) instanceof InitMessage)
+ if (U.unwrapCustomMessage(m.message()) instanceof
InitMessage)
rcvStartSnpReq.countDown();
}
catch (Throwable e) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java
index df51e031417..7fd83a98ef7 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java
@@ -313,8 +313,9 @@ public abstract class IndexWithSameNameTestBase extends
GridCommonAbstractTest {
try {
TcpDiscoveryCustomEventMessage evtMsg =
(TcpDiscoveryCustomEventMessage)msg;
- DiscoveryCustomMessage discoCustomMsg =
U.unwrapCustomMessage(evtMsg.message(marshaller(),
- U.gridClassLoader()));
+ evtMsg.finishUnmarhal(marshaller(), U.gridClassLoader());
+
+ DiscoveryCustomMessage discoCustomMsg =
U.unwrapCustomMessage(evtMsg.message());
if (discoCustomMsg instanceof
SchemaFinishDiscoveryMessage) {
SchemaFinishDiscoveryMessage finishMsg =
(SchemaFinishDiscoveryMessage)discoCustomMsg;
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
index 59fac925a35..ae34c8e4160 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
@@ -53,19 +53,17 @@ public class BlockTcpDiscoverySpi extends
TestTcpDiscoverySpi {
TcpDiscoveryCustomEventMessage cm =
(TcpDiscoveryCustomEventMessage)msg;
- DiscoveryCustomMessage custMsg;
-
try {
- custMsg = cm.message(marshaller(),
U.resolveClassLoader(ignite().configuration()));
+ cm.finishUnmarhal(marshaller(), U.gridClassLoader());
- assertNotNull(custMsg);
+ assertNotNull(cm.message());
}
catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
if (clo != null)
- clo.apply(addr, U.unwrapCustomMessage(custMsg));
+ clo.apply(addr, U.unwrapCustomMessage(cm.message()));
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 08012acc3a0..24559bdb1f6 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -2609,8 +2609,9 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
try {
TcpDiscoveryCustomEventMessage evtMsg =
(TcpDiscoveryCustomEventMessage)msg;
- DiscoveryCustomMessage custMsg =
U.unwrapCustomMessage(evtMsg.message(marshaller(),
- U.gridClassLoader()));
+ evtMsg.finishUnmarhal(marshaller(),
U.gridClassLoader());
+
+ DiscoveryCustomMessage custMsg =
U.unwrapCustomMessage(evtMsg.message());
if (custMsg instanceof
StartRoutineAckDiscoveryMessage) {
log.info("Skip message send and stop node: " +
msg);