This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 40d9b614bf8 IGNITE-28061 Migrate ZookeeperDiscoverySpi to new
serialization framework for discovery custom messages (#12860)
40d9b614bf8 is described below
commit 40d9b614bf876b7b59d29e199ab8c018504f3d32
Author: Nikita Amelchev <[email protected]>
AuthorDate: Wed Mar 11 17:22:19 2026 +0300
IGNITE-28061 Migrate ZookeeperDiscoverySpi to new serialization framework
for discovery custom messages (#12860)
---
.../zk/internal/DiscoveryMessageParser.java | 167 +++++++++++++++++++++
.../zk/internal/ZkDiscoveryCustomEventData.java | 21 ++-
.../zk/internal/ZookeeperDiscoveryImpl.java | 59 +++-----
3 files changed, 208 insertions(+), 39 deletions(-)
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
new file mode 100644
index 00000000000..bd2dd5857b5
--- /dev/null
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+import org.apache.ignite.internal.direct.DirectMessageReader;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import org.apache.ignite.spi.IgniteSpiException;
+
+import static
org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;
+
+/**
+ * Class is responsible for serializing discovery messages using RU-ready
{@link MessageSerializer} mechanism.
+ */
+public class DiscoveryMessageParser {
+ /** Leading byte for messages use {@link JdkMarshaller} for serialization.
*/
+ // TODO: remove these flags after refactoring all discovery messages.
+ private static final byte JAVA_SERIALIZATION = (byte)1;
+
+ /** Leading byte for messages use {@link MessageSerializer} for
serialization. */
+ private static final byte MESSAGE_SERIALIZATION = (byte)2;
+
+ /** Size for an intermediate buffer for serializing discovery messages. */
+ private static final int MSG_BUFFER_SIZE = 100;
+
+ /** */
+ private final MessageFactory msgFactory;
+
+ /** */
+ private final Marshaller marsh;
+
+ /** */
+ public DiscoveryMessageParser(Marshaller marsh) {
+ this.marsh = marsh;
+ this.msgFactory = new IgniteMessageFactoryImpl(
+ new MessageFactoryProvider[] { new DiscoveryMessageFactory(null,
null) });
+ }
+
+ /** Marshals discovery message to bytes array. */
+ public byte[] marshalZip(DiscoveryCustomMessage msg) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) {
+ if (msg instanceof Message) {
+ out.write(MESSAGE_SERIALIZATION);
+
+ serializeMessage((Message)msg, out);
+ }
+ else {
+ out.write(JAVA_SERIALIZATION);
+
+ U.marshal(marsh, msg, out);
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteSpiException("Failed to serialize message: " +
msg, e);
+ }
+
+ return baos.toByteArray();
+ }
+
+ /** Unmarshals discovery message from bytes array. */
+ public DiscoveryCustomMessage unmarshalZip(byte[] bytes) {
+ try (
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ InflaterInputStream in = new InflaterInputStream(bais)
+ ) {
+ byte mode = (byte)in.read();
+
+ if (mode == JAVA_SERIALIZATION)
+ return U.unmarshal(marsh, in, U.gridClassLoader());
+
+ if (MESSAGE_SERIALIZATION != mode)
+ throw new IOException("Received unexpected byte while reading
discovery message: " + mode);
+
+ return (DiscoveryCustomMessage)deserializeMessage(in);
+ }
+ catch (Exception e) {
+ throw new IgniteSpiException("Failed to deserialize message.", e);
+ }
+ }
+
+ /** */
+ private void serializeMessage(Message m, OutputStream out) throws
IOException {
+ DirectMessageWriter msgWriter = new DirectMessageWriter(msgFactory);
+ ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
+
+ msgWriter.setBuffer(msgBuf);
+
+ MessageSerializer msgSer = msgFactory.serializer(m.directType());
+
+ boolean finished;
+
+ do {
+ msgBuf.clear();
+
+ finished = msgSer.writeTo(m, msgWriter);
+
+ out.write(msgBuf.array(), 0, msgBuf.position());
+ }
+ while (!finished);
+ }
+
+ /** */
+ private Message deserializeMessage(InputStream in) throws IOException {
+ DirectMessageReader msgReader = new DirectMessageReader(msgFactory,
null);
+ ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
+
+ msgReader.setBuffer(msgBuf);
+
+ Message msg = msgFactory.create(makeMessageType((byte)in.read(),
(byte)in.read()));
+ MessageSerializer msgSer = msgFactory.serializer(msg.directType());
+
+ boolean finished;
+
+ do {
+ int read = in.read(msgBuf.array(), msgBuf.position(),
msgBuf.remaining());
+
+ if (read == -1)
+ throw new EOFException("Stream closed before message was fully
read.");
+
+ msgBuf.limit(msgBuf.position() + read);
+ msgBuf.rewind();
+
+ finished = msgSer.readFrom(msg, msgReader);
+
+ if (!finished)
+ msgBuf.compact();
+ }
+ while (!finished);
+
+ return msg;
+ }
+}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
index a4db36079e8..87754e169c3 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -20,7 +20,6 @@ package org.apache.ignite.spi.discovery.zk.internal;
import java.util.UUID;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
/**
*
@@ -38,8 +37,8 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData
{
/** */
final String evtPath;
- /** Message instance (can be marshalled as part of
ZkDiscoveryCustomEventData or stored in separate znode. */
- DiscoverySpiCustomMessage msg;
+ /** Message (can be marshalled as part of ZkDiscoveryCustomEventData or
stored in separate znode. */
+ byte[] msgBytes;
/** Unmarshalled message. */
transient DiscoveryCustomMessage resolvedMsg;
@@ -57,7 +56,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData
{
long origEvtId,
long topVer,
UUID sndNodeId,
- DiscoverySpiCustomMessage msg,
+ DiscoveryCustomMessage msg,
String evtPath
) {
super(evtId, ZK_EVT_CUSTOM_EVT, topVer);
@@ -66,11 +65,23 @@ class ZkDiscoveryCustomEventData extends
ZkDiscoveryEventData {
assert msg != null || origEvtId != 0 || !F.isEmpty(evtPath);
this.origEvtId = origEvtId;
- this.msg = msg;
+ this.resolvedMsg = msg;
this.sndNodeId = sndNodeId;
this.evtPath = evtPath;
}
+ /** */
+ public void prepareMarshal(DiscoveryMessageParser parser) {
+ if (resolvedMsg != null)
+ msgBytes = parser.marshalZip(resolvedMsg);
+ }
+
+ /** */
+ public void finishUnmarshal(DiscoveryMessageParser parser) {
+ if (msgBytes != null)
+ resolvedMsg = parser.unmarshalZip(msgBytes);
+ }
+
/**
* @return {@code True} for custom event ack message.
*/
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index fd414cf3da0..ddc79f498b0 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -216,6 +216,9 @@ public class ZookeeperDiscoveryImpl {
/** */
private final ZookeeperDiscoveryStatistics stats;
+ /** */
+ private final DiscoveryMessageParser msgParser;
+
/**
* @param spi Discovery SPI.
* @param igniteInstanceName Instance name.
@@ -262,6 +265,8 @@ public class ZookeeperDiscoveryImpl {
this.evtsAckThreshold = evtsAckThreshold;
this.stats = stats;
+
+ msgParser = new DiscoveryMessageParser(marsh);
}
/**
@@ -666,14 +671,7 @@ public class ZookeeperDiscoveryImpl {
if (!hasServerNode)
throw new IgniteException("Failed to send custom message: no
server nodes in topology.");
- byte[] msgBytes;
-
- try {
- msgBytes = marshalZip(msg);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSpiException("Failed to marshal custom message: "
+ msg, e);
- }
+ byte[] msgBytes = msgParser.marshalZip(msg);
while (!busyLock.enterBusy())
checkState();
@@ -1486,6 +1484,8 @@ public class ZookeeperDiscoveryImpl {
new ZkNoServersMessage(),
null);
+ evtData.prepareMarshal(msgParser);
+
Collection<ZookeeperClusterNode> nodesToAck = Collections.emptyList();
evtsData.addEvent(nodesToAck, evtData);
@@ -1514,11 +1514,13 @@ public class ZookeeperDiscoveryImpl {
if (evtData instanceof ZkDiscoveryCustomEventData) {
ZkDiscoveryCustomEventData evtData0 =
(ZkDiscoveryCustomEventData)evtData;
+ evtData0.finishUnmarshal(msgParser);
+
// It is possible previous coordinator failed before finished
cleanup.
- if (evtData0.msg instanceof
ZkCommunicationErrorResolveFinishMessage) {
+ if (evtData0.resolvedMsg instanceof
ZkCommunicationErrorResolveFinishMessage) {
try {
ZkCommunicationErrorResolveFinishMessage msg =
-
(ZkCommunicationErrorResolveFinishMessage)evtData0.msg;
+
(ZkCommunicationErrorResolveFinishMessage)evtData0.resolvedMsg;
ZkCommunicationErrorResolveResult res = unmarshalZip(
ZkDistributedCollectDataFuture.readResult(rtState.zkClient, zkPaths,
msg.futId));
@@ -2472,7 +2474,7 @@ public class ZookeeperDiscoveryImpl {
DiscoveryCustomMessage msg;
try {
- msg = unmarshalZip(evtBytes);
+ msg = msgParser.unmarshalZip(evtBytes);
}
catch (Exception e) {
U.error(log, "Failed to unmarshal custom discovery
message: " + e, e);
@@ -2559,11 +2561,9 @@ public class ZookeeperDiscoveryImpl {
0L,
evtsData.topVer,
sndNode.id(),
- null,
+ msg,
evtPath);
- evtData.resolvedMsg = msg;
-
if (log.isDebugEnabled())
log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" +
msg + ']');
@@ -2746,19 +2746,16 @@ public class ZookeeperDiscoveryImpl {
if (evtData0.ackEvent() && evtData0.topologyVersion()
< locNode.order())
break;
- DiscoveryCustomMessage msg;
+ evtData0.finishUnmarshal(msgParser);
- if (rtState.crd) {
+ if (rtState.crd)
assert evtData0.resolvedMsg != null : evtData0;
-
- msg = evtData0.resolvedMsg;
- }
else {
- if (evtData0.msg == null) {
+ if (evtData0.resolvedMsg == null) {
if (evtData0.ackEvent()) {
String path =
zkPaths.ackEventDataPath(evtData0.origEvtId);
- msg = unmarshalZip(zkClient.getData(path));
+ evtData0.resolvedMsg =
msgParser.unmarshalZip(zkClient.getData(path));
}
else {
assert evtData0.evtPath != null : evtData0;
@@ -2767,19 +2764,15 @@ public class ZookeeperDiscoveryImpl {
evtData0.evtPath,
evtData0.sndNodeId);
- msg = unmarshalZip(msgBytes);
+ evtData0.resolvedMsg =
msgParser.unmarshalZip(msgBytes);
}
}
- else
- msg = evtData0.msg;
-
- evtData0.resolvedMsg = msg;
}
- if (msg instanceof ZkInternalMessage)
- processInternalMessage(evtData0,
(ZkInternalMessage)msg);
+ if (evtData0.resolvedMsg instanceof ZkInternalMessage)
+ processInternalMessage(evtData0,
(ZkInternalMessage)evtData0.resolvedMsg);
else {
- notifyCustomEvent(evtData0, msg);
+ notifyCustomEvent(evtData0, evtData0.resolvedMsg);
if (!evtData0.ackEvent())
updateNodeInfo = true;
@@ -3455,7 +3448,7 @@ public class ZookeeperDiscoveryImpl {
msg,
null);
- evtData.resolvedMsg = msg;
+ evtData.prepareMarshal(msgParser);
evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
@@ -3770,7 +3763,7 @@ public class ZookeeperDiscoveryImpl {
long evtId = rtState.evtsData.evtIdGen;
- byte[] ackBytes = marshalZip(ack);
+ byte[] ackBytes = msgParser.marshalZip(ack);
String path = zkPaths.ackEventDataPath(origEvt.eventId());
@@ -3788,11 +3781,9 @@ public class ZookeeperDiscoveryImpl {
origEvt.eventId(),
rtState.evtsData.topVer, // Use actual topology version because
topology version must be growing.
locNode.id(),
- null,
+ ack,
null);
- ackEvtData.resolvedMsg = ack;
-
if (log.isDebugEnabled()) {
log.debug("Generated CUSTOM event ack [origEvtId=" +
origEvt.eventId() +
", evt=" + ackEvtData +