This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 87aa722260d IGNITE-27627 Remove support of java serialization for tcp
discovery messages (#13023)
87aa722260d is described below
commit 87aa722260d2ee798e37b79aed1ac3dd205fb9d5
Author: Ilya Shishkov <[email protected]>
AuthorDate: Fri Apr 24 10:19:17 2026 +0300
IGNITE-27627 Remove support of java serialization for tcp discovery
messages (#13023)
---
.../org/apache/ignite/internal/IgniteKernal.java | 23 +++-
.../SecurityAwareCustomMessageWrapper.java | 35 +----
.../ignite/spi/discovery/tcp/ClientImpl.java | 52 +++-----
.../ignite/spi/discovery/tcp/ServerImpl.java | 100 ++++----------
.../spi/discovery/tcp/TcpDiscoveryIoSession.java | 53 +++-----
.../tcp/TcpDiscoveryMessageSerializer.java | 4 -
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 8 --
.../messages/TcpDiscoveryCustomEventMessage.java | 68 +---------
.../NodeSecurityContextPropagationTest.java | 3 +-
.../apache/ignite/spi/MessagesPluginProvider.java | 20 +--
.../tcp/DiscoveryUnmarshalVulnerabilityTest.java | 146 ++++++++++++++++-----
.../ignite/spi/discovery/tcp/ExploitMessage.java | 52 ++++++++
.../spi/discovery/tcp/TestTcpDiscoverySpi.java | 17 ++-
.../apache/ignite/testframework/GridTestUtils.java | 30 +++++
14 files changed, 303 insertions(+), 308 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 437a9b254a0..9d60202d7e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1326,10 +1326,7 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
MessageFactoryProvider f = compType.messageFactory();
if (f != null) {
- if (f instanceof AbstractMarshallableMessageFactoryProvider) {
-
((AbstractMarshallableMessageFactoryProvider)f).init(ctx.marshallerContext().jdkMarshaller(),
- ctx.marshaller(), resolvedClsLdr);
- }
+ initProvider(f, resolvedClsLdr);
compMsgs.add(f);
}
@@ -1340,8 +1337,11 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
if (discoSpi instanceof IgniteDiscoverySpi) {
MessageFactoryProvider discoMsgs =
((IgniteDiscoverySpi)discoSpi).messageFactoryProvider();
- if (discoMsgs != null)
+ if (discoMsgs != null) {
+ initProvider(discoMsgs, resolvedClsLdr);
+
compMsgs.add(discoMsgs);
+ }
}
if (!compMsgs.isEmpty())
@@ -1350,6 +1350,19 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
msgFactory = new IgniteMessageFactoryImpl(msgs);
}
+ /**
+ * Re-init {@link AbstractMarshallableMessageFactoryProvider} with a
proper marshaller and classloader.
+ *
+ * @param factoryProvider Message factory provider.
+ * @param clsLdr Class loader.
+ */
+ private void initProvider(MessageFactoryProvider factoryProvider,
ClassLoader clsLdr) {
+ if (factoryProvider instanceof
AbstractMarshallableMessageFactoryProvider) {
+
((AbstractMarshallableMessageFactoryProvider)factoryProvider).init(ctx.marshallerContext().jdkMarshaller(),
+ ctx.marshaller(), clsLdr);
+ }
+ }
+
/**
* @return Ignite security processor. See {@link IgniteSecurity} for
details.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java
index 7c8269bd14d..e9d33b8433c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java
@@ -18,34 +18,20 @@
package org.apache.ignite.internal.managers.discovery;
import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;
/** Custom message wrapper with ID of security subject that initiated the
current message. */
-public class SecurityAwareCustomMessageWrapper implements
DiscoverySpiCustomMessage, MarshallableMessage {
+public class SecurityAwareCustomMessageWrapper implements
DiscoverySpiCustomMessage {
/** Security subject ID. */
@Order(0)
UUID secSubjId;
/** Original message. */
- private DiscoveryCustomMessage delegate;
-
- /** */
- // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27627
@Order(1)
- Message msg;
-
- /** Serialized message bytes. */
- // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27627
- @Order(2)
- byte[] msgBytes;
+ DiscoveryCustomMessage delegate;
/** Default constructor for {@link MessageFactory}. */
public SecurityAwareCustomMessageWrapper() {
@@ -56,9 +42,6 @@ public class SecurityAwareCustomMessageWrapper implements
DiscoverySpiCustomMess
public SecurityAwareCustomMessageWrapper(DiscoveryCustomMessage delegate,
UUID secSubjId) {
this.delegate = delegate;
this.secSubjId = secSubjId;
-
- if (delegate instanceof Message)
- msg = (Message)delegate;
}
/** Gets security Subject ID. */
@@ -80,7 +63,7 @@ public class SecurityAwareCustomMessageWrapper implements
DiscoverySpiCustomMess
* @return Delegate.
*/
public DiscoveryCustomMessage delegate() {
- return msg != null ? (DiscoveryCustomMessage)msg : delegate;
+ return delegate;
}
/** {@inheritDoc} */
@@ -89,16 +72,4 @@ public class SecurityAwareCustomMessageWrapper implements
DiscoverySpiCustomMess
return ack == null ? null : new SecurityAwareCustomMessageWrapper(ack,
secSubjId);
}
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
- if (!(delegate instanceof Message))
- msgBytes = U.marshal(marsh, delegate);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
- if (msgBytes != null)
- delegate = U.unmarshal(marsh, msgBytes, clsLdr);
- }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index f7812b0ebb8..6403805672b 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -498,35 +498,28 @@ class ClientImpl extends TcpDiscoveryImpl {
if (state == STOPPED || state == SEGMENTED || state == STARTING)
throw new IgniteException("Failed to send custom message: client
is " + state.name().toLowerCase() + ".");
- try {
- TcpDiscoveryCustomEventMessage msg;
-
- DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
+ TcpDiscoveryCustomEventMessage msg;
- if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
- msg = new
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
- else
- msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(),
evt);
+ DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
- Span rootSpan =
tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
- .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () ->
getLocalNodeId().toString())
- .addTag(SpanTags.tag(SpanTags.EVENT_NODE,
SpanTags.CONSISTENT_ID),
- () -> locNode.consistentId().toString())
- .addTag(SpanTags.MESSAGE_CLASS, () ->
customMsg.getClass().getSimpleName())
- .addLog(() -> "Created");
+ if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
+ msg = new
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
+ else
+ msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);
- // This root span will be parent both from local and remote nodes.
-
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
+ Span rootSpan =
tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
+ .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () ->
getLocalNodeId().toString())
+ .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
+ () -> locNode.consistentId().toString())
+ .addTag(SpanTags.MESSAGE_CLASS, () ->
customMsg.getClass().getSimpleName())
+ .addLog(() -> "Created");
- msg.prepareMarshal(spi.marshaller());
+ // This root span will be parent both from local and remote nodes.
+ msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
- sockWriter.sendMessage(msg);
+ sockWriter.sendMessage(msg);
- rootSpan.addLog(() -> "Sent").end();
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSpiException("Failed to marshal custom event: " +
evt, e);
- }
+ rootSpan.addLog(() -> "Sent").end();
}
/** {@inheritDoc} */
@@ -2593,17 +2586,8 @@ class ClientImpl extends TcpDiscoveryImpl {
TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ?
locNode : rmtNodes.get(nodeId);
if (node != null && node.visible()) {
- try {
- msg.finishUnmarshal(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
-
- DiscoverySpiCustomMessage msgObj = msg.message();
-
- notifyDiscovery(
- EVT_DISCOVERY_CUSTOM_EVT, topVer, node,
allVisibleNodes(), msgObj, msg.spanContainer());
- }
- catch (Throwable e) {
- U.error(log, "Failed to unmarshal discovery custom
message.", e);
- }
+ notifyDiscovery(
+ EVT_DISCOVERY_CUSTOM_EVT, topVer, node,
allVisibleNodes(), msg.message(), msg.spanContainer());
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " +
nodeId);
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index cc87595684b..17bda88dcbc 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1017,35 +1017,28 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
- try {
- TcpDiscoveryCustomEventMessage msg;
+ TcpDiscoveryCustomEventMessage msg;
- DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
+ DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
- if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
- msg = new
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
- else
- msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(),
evt);
+ if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
+ msg = new
TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
+ else
+ msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);
- Span rootSpan =
tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
- .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () ->
getLocalNodeId().toString())
- .addTag(SpanTags.tag(SpanTags.EVENT_NODE,
SpanTags.CONSISTENT_ID),
- () -> locNode.consistentId().toString())
- .addTag(SpanTags.MESSAGE_CLASS, () ->
customMsg.getClass().getSimpleName())
- .addLog(() -> "Created");
+ Span rootSpan =
tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
+ .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () ->
getLocalNodeId().toString())
+ .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
+ () -> locNode.consistentId().toString())
+ .addTag(SpanTags.MESSAGE_CLASS, () ->
customMsg.getClass().getSimpleName())
+ .addLog(() -> "Created");
- // This root span will be parent both from local and remote nodes.
-
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
+ // This root span will be parent both from local and remote nodes.
+ msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
- msg.prepareMarshal(spi.marshaller());
-
- msgWorker.addMessage(msg);
+ msgWorker.addMessage(msg);
- rootSpan.addLog(() -> "Sent").end();
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSpiException("Failed to marshal custom event: " +
evt, e);
- }
+ rootSpan.addLog(() -> "Sent").end();
}
/** {@inheritDoc} */
@@ -2556,9 +2549,6 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryCustomEventMessage msg0 =
(TcpDiscoveryCustomEventMessage)msg;
msg = new TcpDiscoveryCustomEventMessage(msg0);
-
- // We shoulgn't store deserialized message in the queue
because of msg is transient.
- ((TcpDiscoveryCustomEventMessage)msg).clearMessage();
}
synchronized (msgs) {
@@ -6106,40 +6096,22 @@ class ServerImpl extends TcpDiscoveryImpl {
processCustomMessage(msg, waitForNotification);
}
}
-
- msg.clearMessage();
}
else {
addMessage(new
TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
- DiscoverySpiCustomMessage msgObj = null;
+ DiscoverySpiCustomMessage customMsg = msg.message();
- try {
- msg.finishUnmarshal(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
-
- msgObj = msg.message();
- }
- catch (Throwable e) {
- U.error(log, "Failed to unmarshal discovery custom
message.", e);
- }
-
- if (msgObj != null) {
- DiscoverySpiCustomMessage nextMsg =
msgObj.ackMessage();
+ if (customMsg != null) {
+ DiscoverySpiCustomMessage nextMsg =
customMsg.ackMessage();
if (nextMsg != null) {
- try {
- TcpDiscoveryCustomEventMessage ackMsg = new
TcpDiscoveryCustomEventMessage(
- getLocalNodeId(), nextMsg);
-
- ackMsg.topologyVersion(msg.topologyVersion());
+ TcpDiscoveryCustomEventMessage ackMsg = new
TcpDiscoveryCustomEventMessage(
+ getLocalNodeId(), nextMsg);
- ackMsg.prepareMarshal(spi.marshaller());
+ ackMsg.topologyVersion(msg.topologyVersion());
- processCustomMessage(ackMsg,
waitForNotification);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal discovery
custom message.", e);
- }
+ processCustomMessage(ackMsg, waitForNotification);
}
}
}
@@ -6165,8 +6137,6 @@ class ServerImpl extends TcpDiscoveryImpl {
notifyDiscoveryListener(msg, waitForNotification);
}
- msg.clearMessage();
-
if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
}
@@ -6303,16 +6273,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (node == null)
return;
- DiscoverySpiCustomMessage msgObj;
-
- try {
- msg.finishUnmarshal(spi.marshaller(),
U.resolveClassLoader(spi.ignite().configuration()));
-
- msgObj = msg.message();
- }
- catch (Throwable t) {
- throw new IgniteException("Failed to unmarshal discovery
custom message: " + msg, t);
- }
+ DiscoverySpiCustomMessage customMsg = msg.message();
IgniteFuture<?> fut = lsnr.onDiscovery(
new DiscoveryNotification(
@@ -6321,13 +6282,13 @@ class ServerImpl extends TcpDiscoveryImpl {
node,
snapshot,
hist,
- msgObj,
+ customMsg,
msg.spanContainer())
);
notifiedDiscovery.set(true);
- if (waitForNotification || msgObj.isMutable()) {
+ if (waitForNotification || customMsg.isMutable()) {
blockingSectionBegin();
try {
@@ -6337,15 +6298,6 @@ class ServerImpl extends TcpDiscoveryImpl {
blockingSectionEnd();
}
}
-
- if (msgObj.isMutable()) {
- try {
- msg.prepareMarshal(spi.marshaller());
- }
- catch (Throwable t) {
- throw new IgniteException("Failed to marshal mutable
discovery message: " + msgObj, t);
- }
- }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
index 30e9b1b73f8..6be18c60c40 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java
@@ -62,19 +62,9 @@ public class TcpDiscoveryIoSession {
/** Size for an intermediate buffer for serializing discovery messages. */
private static final int MSG_BUFFER_SIZE = 100;
- /** Leading byte for messages use {@link JdkMarshaller} for serialization.
*/
- // TODO: remove these flags after refactoring all discovery messages.
- static final byte JAVA_SERIALIZATION = (byte)1;
-
- /** Leading byte for messages use {@link MessageSerializer} for
serialization. */
- static final byte MESSAGE_SERIALIZATION = (byte)2;
-
/** */
final TcpDiscoverySpi spi;
- /** Loads discovery messages classes during java deserialization. */
- private final ClassLoader clsLdr;
-
/** */
private final Socket sock;
@@ -104,8 +94,6 @@ public class TcpDiscoveryIoSession {
this.sock = sock;
this.spi = spi;
- clsLdr = U.resolveClassLoader(spi.ignite().configuration());
-
msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
msgWriter = new DirectMessageWriter(spi.messageFactory());
@@ -130,17 +118,7 @@ public class TcpDiscoveryIoSession {
* @throws IgniteCheckedException If serialization fails.
*/
void writeMessage(TcpDiscoveryAbstractMessage msg) throws
IgniteCheckedException, IOException {
- if (!(msg instanceof Message)) {
- out.write(JAVA_SERIALIZATION);
-
- U.marshal(spi.marshaller(), msg, out);
-
- return;
- }
-
try {
- out.write(MESSAGE_SERIALIZATION);
-
serializeMessage((Message)msg, out);
out.flush();
@@ -162,21 +140,23 @@ public class TcpDiscoveryIoSession {
* @throws IgniteCheckedException If deserialization fails.
*/
<T> T readMessage() throws IgniteCheckedException, IOException {
- byte serMode = (byte)in.read();
+ try {
+ byte b0 = (byte)in.read();
+ byte b1 = (byte)in.read();
- if (JAVA_SERIALIZATION == serMode)
- return U.unmarshal(spi.marshaller(), in, clsLdr);
+ short msgType = makeMessageType(b0, b1);
- try {
- if (MESSAGE_SERIALIZATION != serMode) {
- detectSslAlert(serMode, in);
+ Message msg;
- // IOException type is important for ServerImpl. It may search
the cause (X.hasCause).
- // The connection error processing behavior depends on it.
- throw new IOException("Received unexpected byte while reading
discovery message: " + serMode);
+ try {
+ msg = spi.messageFactory().create(msgType);
}
+ catch (IgniteException e) {
+ detectSslAlert(b0, b1, in);
- Message msg =
spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read()));
+ // 'Invalid message type' should not be lost.
+ throw e;
+ }
msgReader.reset();
msgReader.setBuffer(msgBuf);
@@ -273,12 +253,13 @@ public class TcpDiscoveryIoSession {
* See handling {@code StreamCorruptedException} in {@link #readMessage()}.
* Keeps logic similar to {@link
java.io.ObjectInputStream#readStreamHeader}.
*/
- private void detectSslAlert(byte firstByte, InputStream in) throws
IOException {
+ private void detectSslAlert(byte b0, byte b1, InputStream in) throws
IOException {
byte[] hdr = new byte[4];
- hdr[0] = firstByte;
- int read = in.readNBytes(hdr, 1, 3);
+ hdr[0] = b0;
+ hdr[1] = b1;
+ int read = in.readNBytes(hdr, 2, 2);
- if (read < 3)
+ if (read < 2)
throw new EOFException();
String hex = String.format("%02x%02x%02x%02x", hdr[0], hdr[1], hdr[2],
hdr[3]);
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
index 1c9e1bcc69c..8c871f9e2eb 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java
@@ -22,7 +22,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -59,9 +58,6 @@ class TcpDiscoveryMessageSerializer extends
TcpDiscoveryIoSession {
* @throws IOException If serialization fails.
*/
byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws
IgniteCheckedException, IOException {
- if (!(msg instanceof Message))
- return U.marshal(spi.marshaller(), msg);
-
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
serializeMessage((Message)msg, out);
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 2fa700d9c7f..903524c8aa9 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -71,7 +71,6 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
@@ -1705,13 +1704,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
try (SocketTimeoutObject ignored = startTimer(sock, timeout)) {
OutputStream out = sock.getOutputStream();
- // Write Ignite header without leading byte.
- if (msg != null) {
- byte mode = msg instanceof Message ?
TcpDiscoveryIoSession.MESSAGE_SERIALIZATION :
TcpDiscoveryIoSession.JAVA_SERIALIZATION;
-
- out.write(mode);
- }
-
out.write(data);
out.flush();
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index e4ed7cafb8b..09784f49661 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -19,34 +19,21 @@ package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.Objects;
import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.jetbrains.annotations.Nullable;
/**
- * Wrapper for custom message.
+ * Wrapper for {@link DiscoveryCustomMessage}.
*/
@TcpDiscoveryRedirectToClient
@TcpDiscoveryEnsureDelivery
public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceableMessage {
/** */
- private volatile DiscoverySpiCustomMessage msg;
-
- /** Serialized message bytes. */
- // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27627
@Order(0)
- volatile @Nullable byte[] msgBytes;
-
- /** {@link Message} representation of original message. */
- // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27627
- @Order(1)
- volatile @Nullable Message serMsg;
+ DiscoverySpiCustomMessage msg;
/**
* Constructor for {@link MessageFactory}.
@@ -72,18 +59,9 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
public TcpDiscoveryCustomEventMessage(TcpDiscoveryCustomEventMessage msg) {
super(msg);
- msgBytes = msg.msgBytes;
- serMsg = msg.serMsg;
this.msg = msg.msg;
}
- /**
- * Clear deserialized form of wrapped message.
- */
- public void clearMessage() {
- msg = null;
- }
-
/**
* @return Original message.
*/
@@ -91,49 +69,11 @@ public class TcpDiscoveryCustomEventMessage extends
TcpDiscoveryAbstractTraceabl
return msg;
}
- /**
- * Prepare message for serialization.
- *
- * @param marsh Marshaller.
- */
- // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27627
- @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
- super.prepareMarshal(marsh);
-
- if (msg instanceof Message)
- serMsg = (Message)msg;
- else {
- if (msg != null)
- msgBytes = U.marshal(marsh, msg);
- }
- }
-
- /**
- * Finish deserialization.
- *
- * @param marsh Marshaller.
- * @param ldr Class loader.
- */
- // TODO: Should be removed in
https://issues.apache.org/jira/browse/IGNITE-27627
- @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr)
throws IgniteCheckedException {
- super.finishUnmarshal(marsh, ldr);
-
- if (msg != null)
- return;
-
- if (serMsg != null)
- msg = (DiscoverySpiCustomMessage)serMsg;
- else {
- if (msgBytes != null)
- msg = U.unmarshal(marsh, msgBytes, ldr);
- }
- }
-
/** {@inheritDoc} */
@Override public boolean equals(Object obj) {
return super.equals(obj) &&
obj instanceof TcpDiscoveryCustomEventMessage &&
-
Objects.equals(((TcpDiscoveryCustomEventMessage)obj).verifierNodeId(),
verifierNodeId());
+
Objects.equals(((TcpDiscoveryAbstractMessage)obj).verifierNodeId(),
verifierNodeId());
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
index a900dc423b2..8619509d5cd 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java
@@ -53,7 +53,6 @@ import org.junit.Test;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
import static
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
-import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -185,7 +184,7 @@ public class NodeSecurityContextPropagationTest extends
GridCommonAbstractTest {
Object unwrappedMsg = msg;
if (msg instanceof TcpDiscoveryCustomEventMessage) {
- DiscoverySpiCustomMessage customMsg = getFieldValue(msg,
"serMsg");
+ DiscoverySpiCustomMessage customMsg =
((TcpDiscoveryCustomEventMessage)msg).message();
assert customMsg instanceof SecurityAwareCustomMessageWrapper;
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java
b/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java
index bda1aebcd22..865d9ba1644 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java
@@ -26,9 +26,10 @@ import org.apache.ignite.plugin.ExtensionRegistry;
import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.plugin.extensions.communication.Message;
import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
+import static org.apache.ignite.testframework.GridTestUtils.loadSerializer;
+
/**
* Plugin provider for registering test messages in the communication and
discovery protocols.
*/
@@ -52,7 +53,7 @@ public class MessagesPluginProvider extends
AbstractTestPluginProvider {
}
};
- f.register(directType, msgSupp, loadSerializer(msg));
+ f.register(directType, msgSupp, loadSerializer(msg, null,
null));
directType++;
}
@@ -75,19 +76,6 @@ public class MessagesPluginProvider extends
AbstractTestPluginProvider {
// Register messages into the discovery protocol.
TestTcpDiscoverySpi discoSpi =
(TestTcpDiscoverySpi)ctx.igniteConfiguration().getDiscoverySpi();
- discoSpi.messageFactory(msgFactoryProvider);
- }
-
- /** */
- private MessageSerializer<? extends Message> loadSerializer(Class<?
extends Message> msgCls) {
- try {
- Class<?> serCls = U.gridClassLoader()
- .loadClass(msgCls.getPackage().getName() + "." +
msgCls.getSimpleName() + "Serializer");
-
- return (MessageSerializer<? extends Message>)U.newInstance(serCls);
- }
- catch (Exception e) {
- throw new RuntimeException("Unable to find serializer for message:
" + msgCls, e);
- }
+ discoSpi.messageFactory(msgFactoryProvider, ctx.igniteConfiguration());
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
index b8a82d52cc9..830e9aa27e4 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java
@@ -24,14 +24,23 @@ import java.io.OutputStream;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.Socket;
+import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.CoreMessagesProvider;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import
org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshallers;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -39,18 +48,19 @@ import org.junit.Test;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_OBJECT_INPUT_FILTER_AUTOCONFIGURATION;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHALLER_BLACKLIST;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHALLER_WHITELIST;
+import static org.apache.ignite.testframework.GridTestUtils.loadSerializer;
/**
* Tests for whitelist and blacklist ot avoiding deserialization vulnerability.
*/
@WithSystemProperty(key = IGNITE_ENABLE_OBJECT_INPUT_FILTER_AUTOCONFIGURATION,
value = "false")
public class DiscoveryUnmarshalVulnerabilityTest extends
GridCommonAbstractTest {
- /** Marshaller. */
- private static final JdkMarshaller MARSH = Marshallers.jdk();
-
/** Shared value. */
private static final AtomicBoolean SHARED = new AtomicBoolean();
+ /** */
+ private LogListener lsnr;
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
@@ -63,6 +73,33 @@ public class DiscoveryUnmarshalVulnerabilityTest extends
GridCommonAbstractTest
IgniteUtils.clearClassCache();
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ MessageFactoryProvider msgFactoryProvider = new
AbstractMarshallableMessageFactoryProvider() {
+ @Override public void registerAll(MessageFactory factory) {
+ factory.register(
+ CoreMessagesProvider.MAX_MESSAGE_ID + 1,
+ ExploitMessage::new,
+ new MessageSerializerWrapper(this));
+ }
+ };
+
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
+ discoSpi.messageFactory(msgFactoryProvider, cfg);
+
+ lsnr = LogListener.matches("Invalid message type").build();
+
+ return cfg.setDiscoverySpi(discoSpi)
+ .setGridLogger(new ListeningTestLogger(log, lsnr));
+ }
+
/**
* @throws Exception If failed.
*/
@@ -132,36 +169,33 @@ public class DiscoveryUnmarshalVulnerabilityTest extends
GridCommonAbstractTest
testExploit(false);
}
+ /** */
+ @Test
+ public void testExploitDirectly() throws Exception {
+ startGrid();
+
+ attack(Marshallers.jdk().marshal(new Exploit()));
+
+ assertTrue("Invalid message type MUST occur",
lsnr.check(getTestTimeout()));
+ assertFalse(SHARED.get());
+ }
+
/**
* @param positive Positive.
*/
private void testExploit(boolean positive) throws Exception {
- try {
- startGrid();
+ startGrid();
- attack(marshal(new Exploit()));
+ attack(serializedMessage());
- boolean res = GridTestUtils.waitForCondition(new
GridAbsPredicate() {
- @Override public boolean apply() {
- return SHARED.get();
- }
- }, 3000L);
+ boolean res = GridTestUtils.waitForCondition(SHARED::get, 5000);
- if (positive)
- assertTrue(res);
- else
- assertFalse(res);
- }
- finally {
- stopAllGrids();
- }
- }
+ if (positive)
+ assertTrue(res);
+ else
+ assertFalse(res);
- /**
- * @param obj Object.
- */
- private static byte[] marshal(Object obj) throws IgniteCheckedException {
- return MARSH.marshal(obj);
+ assertFalse("Invalid message type ocurred", lsnr.check());
}
/**
@@ -175,13 +209,26 @@ public class DiscoveryUnmarshalVulnerabilityTest extends
GridCommonAbstractTest
OutputStream oos = new BufferedOutputStream(sock.getOutputStream())
) {
oos.write(U.IGNITE_HEADER);
- oos.write((byte)1); // Flag for java serialization.
oos.write(data);
}
}
/** */
- private static class Exploit implements Serializable {
+ private byte[] serializedMessage() {
+ ByteBuffer buf = ByteBuffer.allocate(4096);
+
+ MessageFactory msgFactory =
((TcpDiscoverySpi)grid().configuration().getDiscoverySpi()).messageFactory();
+
+ DirectMessageWriter writer = new DirectMessageWriter(msgFactory);
+ writer.setBuffer(buf);
+
+ writer.writeMessage(new ExploitMessage(new Exploit()));
+
+ return buf.flip().compact().array();
+ }
+
+ /** */
+ static class Exploit implements Serializable {
/**
* @param is Input stream.
*/
@@ -189,4 +236,43 @@ public class DiscoveryUnmarshalVulnerabilityTest extends
GridCommonAbstractTest
SHARED.set(true);
}
}
+
+ /** */
+ private static class MessageSerializerWrapper implements
MessageSerializer<ExploitMessage> {
+ /** */
+ private final AbstractMarshallableMessageFactoryProvider provider;
+
+ /** */
+ private final AtomicBoolean init = new AtomicBoolean(true);
+
+ /** */
+ private MessageSerializer<ExploitMessage> serde;
+
+ /** */
+ private
MessageSerializerWrapper(AbstractMarshallableMessageFactoryProvider provider) {
+ this.provider = provider;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ExploitMessage msg, MessageWriter
writer) {
+ initIfNecessary();
+
+ return serde.writeTo(msg, writer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ExploitMessage msg, MessageReader
reader) {
+ initIfNecessary();
+
+ return serde.readFrom(msg, reader);
+ }
+
+ /** */
+ private void initIfNecessary() {
+ if (init.get() && init.compareAndSet(true, false))
+ serde = loadSerializer(ExploitMessage.class,
+ U.field(provider, "dfltMarsh"),
+ U.field(provider, "resolvedClsLdr"));
+ }
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ExploitMessage.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ExploitMessage.java
new file mode 100644
index 00000000000..182f91f8262
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ExploitMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.marshaller.Marshaller;
+import
org.apache.ignite.spi.discovery.tcp.DiscoveryUnmarshalVulnerabilityTest.Exploit;
+
+/** */
+class ExploitMessage implements MarshallableMessage {
+ /** */
+ @Order(0)
+ byte[] exploitBytes;
+
+ /** */
+ private Exploit exploit;
+
+ /** */
+ public ExploitMessage() {}
+
+ /** */
+ public ExploitMessage(Exploit exploit) {
+ this.exploit = exploit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ exploitBytes = marsh.marshal(exploit);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
+ exploit = marsh.unmarshal(exploitBytes, clsLdr);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
index 2aaabee1aeb..cf6affb8b35 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp;
import java.io.IOException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.CoreMessagesProvider;
import
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
@@ -55,6 +56,9 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi
implements IgniteDiscov
/** */
private MessageFactory msgFactory;
+ /** */
+ private MessageFactoryProvider provider;
+
/** {@inheritDoc} */
@Override protected void writeMessage(TcpDiscoveryIoSession ses,
TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
IgniteCheckedException {
@@ -116,16 +120,23 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi
implements IgniteDiscov
* Otherwise, this method call will take no effect.
*
* @param msgFactoryProvider Discovery messages factory provider.
+ * @param cfg Ignite configuration.
*/
- public void messageFactory(MessageFactoryProvider msgFactoryProvider) {
+ public void messageFactory(MessageFactoryProvider msgFactoryProvider,
IgniteConfiguration cfg) {
+ provider = msgFactoryProvider;
assert !started();
- this.msgFactory = new IgniteMessageFactoryImpl(new
MessageFactoryProvider[] {
- new CoreMessagesProvider(jdk(), jdk(),
U.resolveClassLoader(ignite().configuration())),
+ msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]
{
+ new CoreMessagesProvider(jdk(), jdk(), U.resolveClassLoader(cfg)),
msgFactoryProvider
});
}
+ /** {@inheritDoc} */
+ @Override public MessageFactoryProvider messageFactoryProvider() {
+ return provider;
+ }
+
/** {@inheritDoc} */
@Override protected void initLocalNode(int srvPort, boolean
addExtAddrAttr) {
if (msgFactory != null)
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 373ac47f9f4..5896f7df677 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -94,6 +94,7 @@ import
org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -121,7 +122,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
@@ -138,6 +141,7 @@ import static
org.apache.ignite.IgniteSystemProperties.IGNITE_HOME;
import static
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree.partitionFileName;
import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.nodeIds;
+import static org.apache.ignite.marshaller.Marshallers.jdk;
import static org.apache.ignite.ssl.SslContextFactory.DFLT_KEY_ALGORITHM;
import static org.apache.ignite.ssl.SslContextFactory.DFLT_SSL_PROTOCOL;
import static org.apache.ignite.ssl.SslContextFactory.DFLT_STORE_TYPE;
@@ -2628,4 +2632,30 @@ public final class GridTestUtils {
setFieldValue(nioSrvr, "skipRead", skip);
}
+
+ /** */
+ public static <T extends Message> MessageSerializer<T>
loadSerializer(Class<? extends Message> msgCls,
+ @Nullable Marshaller dfltMarsh, @Nullable ClassLoader dfltClsLdr) {
+ try {
+ boolean isMarshallable =
MarshallableMessage.class.isAssignableFrom(msgCls);
+
+ String clsPref = msgCls.getSimpleName() + (isMarshallable ?
"Marshallable" : "");
+
+ Class<?> serCls = U.gridClassLoader()
+ .loadClass(msgCls.getPackage().getName() + "." + clsPref +
"Serializer");
+
+ Marshaller marsh = dfltMarsh != null ? dfltMarsh : jdk();
+ ClassLoader cldLdr = dfltClsLdr != null ? dfltClsLdr :
U.gridClassLoader();
+
+ Object msgSer = isMarshallable ?
+ serCls.getConstructor(Marshaller.class, ClassLoader.class)
+ .newInstance(marsh, cldLdr) :
+ U.newInstance(serCls);
+
+ return (MessageSerializer<T>)msgSer;
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Unable to find serializer for message:
" + msgCls, e);
+ }
+ }
}