This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 197a14487a6 IGNITE-28498 Use MessageSerializer for Zookeeper discovery
custom messages (#13000)
197a14487a6 is described below
commit 197a14487a69e5fe5656c03b35d7cceb38781b76
Author: Nikita Amelchev <[email protected]>
AuthorDate: Thu Apr 16 11:29:30 2026 +0300
IGNITE-28498 Use MessageSerializer for Zookeeper discovery custom messages
(#13000)
---
.../apache/ignite/internal/MessageProcessor.java | 19 +++++++--
.../org/apache/ignite/internal/IgniteKernal.java | 14 +++++--
.../managers/discovery/IgniteDiscoverySpi.java | 6 +++
.../query/h2/twostep/msg/GridH2Null.java | 6 ---
modules/zookeeper/pom.xml | 6 +++
.../spi/discovery/zk/ZookeeperDiscoverySpi.java | 7 ++++
.../zk/internal/DiscoveryMessageParser.java | 46 ++++------------------
.../ZkCommunicationErrorResolveFinishMessage.java | 18 +++++++--
.../ZkCommunicationErrorResolveStartMessage.java | 13 +++++-
.../zk/internal/ZkForceNodeFailMessage.java | 16 ++++++--
.../zk/internal/ZkInternalJoinErrorMessage.java | 4 +-
.../discovery/zk/internal/ZkInternalMessage.java | 4 +-
...NoServersMessage.java => ZkMessageFactory.java} | 26 +++++-------
.../discovery/zk/internal/ZkNoServersMessage.java | 3 +-
.../zk/internal/ZookeeperDiscoveryImpl.java | 2 +-
15 files changed, 107 insertions(+), 83 deletions(-)
diff --git
a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java
b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java
index 58badd5a3d0..cf62696f3a7 100644
---
a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java
+++
b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java
@@ -18,11 +18,14 @@
package org.apache.ignite.internal;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.annotation.processing.AbstractProcessor;
import javax.annotation.processing.RoundEnvironment;
import javax.annotation.processing.SupportedAnnotationTypes;
@@ -72,8 +75,12 @@ public class MessageProcessor extends AbstractProcessor {
/** Externalizable message. */
static final String MARSHALLABLE_MESSAGE_INTERFACE =
"org.apache.ignite.internal.MarshallableMessage";
- /** This is the only message with zero fields. A serializer must be
generated due to restrictions in our communication process. */
- static final String HANDSHAKE_WAIT_MESSAGE =
"org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage";
+ /** Messages with no fields. A serializer must be generated due to
restrictions in our communication process. */
+ static final String[] EMPTY_MESSAGES = {
+
"org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage",
+ "org.apache.ignite.spi.discovery.zk.internal.ZkNoServersMessage",
+
"org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Null",
+ };
/** */
private final Map<String, IgniteBiTuple<String, String>> enumMappersInUse
= new HashMap<>();
@@ -83,7 +90,11 @@ public class MessageProcessor extends AbstractProcessor {
*/
@Override public boolean process(Set<? extends TypeElement> annotations,
RoundEnvironment roundEnv) {
TypeMirror msgType =
processingEnv.getElementUtils().getTypeElement(MESSAGE_INTERFACE).asType();
- TypeMirror handshakeWaitMsgType =
processingEnv.getElementUtils().getTypeElement(HANDSHAKE_WAIT_MESSAGE).asType();
+ List<TypeMirror> emptyMsgs = Arrays.stream(EMPTY_MESSAGES)
+ .map(cls -> processingEnv.getElementUtils().getTypeElement(cls))
+ .filter(Objects::nonNull)
+ .map(Element::asType)
+ .collect(Collectors.toList());
Map<TypeElement, List<VariableElement>> msgFields = new HashMap<>();
@@ -101,7 +112,7 @@ public class MessageProcessor extends AbstractProcessor {
List<VariableElement> fields = orderedFields(clazz);
- if (!fields.isEmpty() ||
processingEnv.getTypeUtils().isAssignable(clazz.asType(), handshakeWaitMsgType))
+ if (!fields.isEmpty() || emptyMsgs.stream().anyMatch(t ->
processingEnv.getTypeUtils().isAssignable(clazz.asType(), t)))
msgFields.put(clazz, fields);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 84f76d21449..315130fe561 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -103,6 +103,7 @@ import
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImp
import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
import
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.failover.GridFailoverManager;
@@ -214,6 +215,7 @@ import
org.apache.ignite.plugin.extensions.communication.MessageFactory;
import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiVersionCheckException;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.isolated.IsolatedDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.tracing.TracingConfigurationManager;
@@ -1315,9 +1317,6 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
private void initMessageFactory() throws IgniteCheckedException {
MessageFactoryProvider[] msgs =
ctx.plugins().extensions(MessageFactoryProvider.class);
- if (msgs == null)
- msgs = new MessageFactoryProvider[0];
-
List<MessageFactoryProvider> compMsgs = new ArrayList<>();
compMsgs.add(new CoreMessagesProvider(ctx.marshaller(),
ctx.marshallerContext().jdkMarshaller(),
@@ -1330,6 +1329,15 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
compMsgs.add(f);
}
+ DiscoverySpi discoSpi = ctx.config().getDiscoverySpi();
+
+ if (discoSpi instanceof IgniteDiscoverySpi) {
+ MessageFactoryProvider discoMsgs =
((IgniteDiscoverySpi)discoSpi).messageFactoryProvider();
+
+ if (discoMsgs != null)
+ compMsgs.add(discoMsgs);
+ }
+
if (!compMsgs.isEmpty())
msgs = F.concat(msgs, compMsgs.toArray(new
MessageFactoryProvider[compMsgs.size()]));
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
index 54cc67ba987..f08a653bffb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.managers.discovery;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.DiscoverySpi;
/**
@@ -57,4 +58,9 @@ public interface IgniteDiscoverySpi extends DiscoverySpi {
* @param err Connection error.
*/
public void resolveCommunicationFailure(ClusterNode node, Exception err);
+
+ /** @return Message factory provider. */
+ public default MessageFactoryProvider messageFactoryProvider() {
+ return null;
+ }
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java
index 90b6bbde5ab..5fcdf6988fc 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.h2.twostep.msg;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.Order;
import org.h2.value.Value;
import org.h2.value.ValueNull;
@@ -29,10 +28,6 @@ public class GridH2Null extends GridH2ValueMessage {
/** */
public static GridH2Null INSTANCE = new GridH2Null();
- /** Dummy field to use codegen serializer. */
- @Order(0)
- byte dummy;
-
/**
* Disallow new instance creation.
*/
@@ -45,7 +40,6 @@ public class GridH2Null extends GridH2ValueMessage {
return ValueNull.INSTANCE;
}
-
/** {@inheritDoc} */
@Override public String toString() {
return "NULL";
diff --git a/modules/zookeeper/pom.xml b/modules/zookeeper/pom.xml
index d49dfd14b4e..7f0b6003755 100644
--- a/modules/zookeeper/pom.xml
+++ b/modules/zookeeper/pom.xml
@@ -40,6 +40,12 @@
<artifactId>ignite-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>ignite-codegen</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index e10799e1b42..1d0f2c68111 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteProductVersion;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
@@ -57,6 +58,7 @@ import
org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
import org.apache.ignite.spi.discovery.zk.internal.ZkIgnitePaths;
+import org.apache.ignite.spi.discovery.zk.internal.ZkMessageFactory;
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode;
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
import
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryStatistics;
@@ -554,6 +556,11 @@ public class ZookeeperDiscoverySpi extends
IgniteSpiAdapter implements IgniteDis
return locNode;
}
+ /** {@inheritDoc} */
+ @Override public MessageFactoryProvider messageFactoryProvider() {
+ return new ZkMessageFactory();
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ZookeeperDiscoverySpi.class, this);
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
index a3944ac917c..6512c2ac9a4 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
@@ -19,7 +19,6 @@ package org.apache.ignite.spi.discovery.zk.internal;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -28,8 +27,6 @@ import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
@@ -42,25 +39,14 @@ import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMe
* Class is responsible for serializing discovery messages using RU-ready
{@link MessageSerializer} mechanism.
*/
public class DiscoveryMessageParser {
- /** Leading byte for messages use {@link JdkMarshaller} for serialization.
*/
- // TODO: remove these flags after refactoring all discovery messages.
- private static final byte JAVA_SERIALIZATION = (byte)1;
-
- /** Leading byte for messages use {@link MessageSerializer} for
serialization. */
- private static final byte MESSAGE_SERIALIZATION = (byte)2;
-
/** Size for an intermediate buffer for serializing discovery messages. */
private static final int MSG_BUFFER_SIZE = 100;
- /** */
- private final JdkMarshaller jdkMarshaller;
-
/** */
private final MessageFactory msgFactory;
/** */
- public DiscoveryMessageParser(JdkMarshaller jdkMarshaller, MessageFactory
msgFactory) {
- this.jdkMarshaller = jdkMarshaller;
+ public DiscoveryMessageParser(MessageFactory msgFactory) {
this.msgFactory = msgFactory;
}
@@ -69,16 +55,7 @@ public class DiscoveryMessageParser {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) {
- if (msg instanceof Message) {
- out.write(MESSAGE_SERIALIZATION);
-
- serializeMessage((Message)msg, out);
- }
- else {
- out.write(JAVA_SERIALIZATION);
-
- U.marshal(jdkMarshaller, msg, out);
- }
+ serializeMessage((Message)msg, out);
}
catch (Exception e) {
throw new IgniteSpiException("Failed to serialize message: " +
msg, e);
@@ -93,14 +70,6 @@ public class DiscoveryMessageParser {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
InflaterInputStream in = new InflaterInputStream(bais)
) {
- byte mode = (byte)in.read();
-
- if (mode == JAVA_SERIALIZATION)
- return U.unmarshal(jdkMarshaller, in, U.gridClassLoader());
-
- if (MESSAGE_SERIALIZATION != mode)
- throw new IOException("Received unexpected byte while reading
discovery message: " + mode);
-
return (DiscoverySpiCustomMessage)deserializeMessage(in);
}
catch (Exception e) {
@@ -144,14 +113,15 @@ public class DiscoveryMessageParser {
do {
int read = in.read(msgBuf.array(), msgBuf.position(),
msgBuf.remaining());
- if (read == -1)
- throw new EOFException("Stream closed before message was fully
read.");
-
- msgBuf.limit(msgBuf.position() + read);
- msgBuf.rewind();
+ if (read > 0) {
+ msgBuf.limit(msgBuf.position() + read);
+ msgBuf.rewind();
+ }
finished = msgSer.readFrom(msg, msgReader);
+ assert read != -1 || finished : "Stream closed before message was
fully read.";
+
if (!finished)
msgBuf.compact();
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
index 1dff2edf88a..d78606da464 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
@@ -18,25 +18,35 @@
package org.apache.ignite.spi.discovery.zk.internal;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;
/**
*
*/
-class ZkCommunicationErrorResolveFinishMessage implements
DiscoverySpiCustomMessage, ZkInternalMessage {
+class ZkCommunicationErrorResolveFinishMessage implements
DiscoverySpiCustomMessage, ZkInternalMessage, Message {
/** */
private static final long serialVersionUID = 0L;
/** */
- final UUID futId;
+ @Order(0)
+ UUID futId;
/** */
- final long topVer;
+ @Order(1)
+ long topVer;
/** */
- transient ZkCommunicationErrorResolveResult res;
+ ZkCommunicationErrorResolveResult res;
+
+ /** Constructor for {@link MessageFactory}. */
+ public ZkCommunicationErrorResolveFinishMessage() {
+ // No-op.
+ }
/**
* @param futId Future ID.
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
index 763fe51147b..b5b7c046a83 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
@@ -18,19 +18,28 @@
package org.apache.ignite.spi.discovery.zk.internal;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;
/**
* Zk Communication Error Resolve Start Message.
*/
-public class ZkCommunicationErrorResolveStartMessage implements
DiscoverySpiCustomMessage, ZkInternalMessage {
+public class ZkCommunicationErrorResolveStartMessage implements
DiscoverySpiCustomMessage, ZkInternalMessage, Message {
/** */
private static final long serialVersionUID = 0L;
/** */
- final UUID id;
+ @Order(0)
+ UUID id;
+
+ /** Constructor for {@link MessageFactory}. */
+ public ZkCommunicationErrorResolveStartMessage() {
+ // No-op.
+ }
/**
* @param id Unique ID.
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
index 060f354bdda..3f40be08fc8 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
@@ -17,22 +17,32 @@
package org.apache.ignite.spi.discovery.zk.internal;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;
/**
* Zk Force Node Fail Message.
*/
-public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage,
ZkInternalMessage {
+public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage,
ZkInternalMessage, Message {
/** */
private static final long serialVersionUID = 0L;
/** */
- final long nodeInternalId;
+ @Order(0)
+ long nodeInternalId;
/** */
- final String warning;
+ @Order(1)
+ String warning;
+
+ /** Constructor for {@link MessageFactory}. */
+ public ZkForceNodeFailMessage() {
+ // No-op.
+ }
/**
* @param nodeInternalId Node ID.
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
index 7e2f9dab120..fc0a0f7c4bf 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
@@ -17,10 +17,12 @@
package org.apache.ignite.spi.discovery.zk.internal;
+import java.io.Serializable;
+
/**
*
*/
-class ZkInternalJoinErrorMessage implements ZkInternalMessage {
+class ZkInternalJoinErrorMessage implements ZkInternalMessage, Serializable {
/** */
private static final long serialVersionUID = 0L;
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
index c1d56f0eeda..3bb2c09338c 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
@@ -17,11 +17,9 @@
package org.apache.ignite.spi.discovery.zk.internal;
-import java.io.Serializable;
-
/**
*
*/
-interface ZkInternalMessage extends Serializable {
+interface ZkInternalMessage {
// No-op.
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java
similarity index 54%
copy from
modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
copy to
modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java
index 82c804de1f2..39d89b32af8 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java
@@ -17,24 +17,16 @@
package org.apache.ignite.spi.discovery.zk.internal;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-class ZkNoServersMessage implements DiscoverySpiCustomMessage,
ZkInternalMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
- return null;
- }
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+/** */
+public class ZkMessageFactory implements MessageFactoryProvider {
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ZkNoServersMessage.class, this);
+ @Override public void registerAll(MessageFactory factory) {
+ factory.register(400, ZkCommunicationErrorResolveFinishMessage::new,
new ZkCommunicationErrorResolveFinishMessageSerializer());
+ factory.register(401, ZkCommunicationErrorResolveStartMessage::new,
new ZkCommunicationErrorResolveStartMessageSerializer());
+ factory.register(402, ZkForceNodeFailMessage::new, new
ZkForceNodeFailMessageSerializer());
+ factory.register(403, ZkNoServersMessage::new, new
ZkNoServersMessageSerializer());
}
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
index 82c804de1f2..86fcebb8cbe 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
@@ -18,13 +18,14 @@
package org.apache.ignite.spi.discovery.zk.internal;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;
/**
*
*/
-class ZkNoServersMessage implements DiscoverySpiCustomMessage,
ZkInternalMessage {
+class ZkNoServersMessage implements DiscoverySpiCustomMessage,
ZkInternalMessage, Message {
/** */
private static final long serialVersionUID = 0L;
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index e056f1d96a8..7abe3ddf1ed 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -269,7 +269,7 @@ public class ZookeeperDiscoveryImpl {
this.stats = stats;
- msgParser = new DiscoveryMessageParser(jdkMarshaller, msgFactory);
+ msgParser = new DiscoveryMessageParser(msgFactory);
}
/**