This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e847e4d32b8 IGNITE-26230 Introduce MessageFormat (#6466)
e847e4d32b8 is described below
commit e847e4d32b8184e13c3e251433dd728e4870ca85
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Aug 22 18:50:42 2025 +0400
IGNITE-26230 Introduce MessageFormat (#6466)
---
.../network/serialization/MessageFormat.java | 39 ++++++++++++++++++++
.../internal/network/NaiveMessageFormat.java | 42 ++++++++++++++++++++++
.../internal/network/netty/InboundDecoder.java | 9 +++--
.../internal/network/netty/OutboundEncoder.java | 11 +++---
.../internal/network/netty/PipelineUtils.java | 8 +++--
.../internal/network/netty/InboundDecoderTest.java | 16 +++++----
.../network/serialization/MarshallableTest.java | 7 ++--
7 files changed, 115 insertions(+), 17 deletions(-)
diff --git
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageFormat.java
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageFormat.java
new file mode 100644
index 00000000000..96bc5d329f3
--- /dev/null
+++
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/serialization/MessageFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+/**
+ * Defines message format, meaning how message should be read or written.
+ */
+public interface MessageFormat {
+ /**
+ * Creates a new message writer.
+ *
+ * @param serializationRegistry Registry to use.
+ * @param protoVer Binary protocol version to use for serialization.
+ */
+ MessageWriter writer(MessageSerializationRegistry serializationRegistry,
byte protoVer);
+
+ /**
+ * Creates a new message reader.
+ *
+ * @param serializationRegistry Registry to use.
+ * @param protoVer Binary protocol version to use for deserialization.
+ */
+ MessageReader reader(MessageSerializationRegistry serializationRegistry,
byte protoVer);
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/NaiveMessageFormat.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/NaiveMessageFormat.java
new file mode 100644
index 00000000000..0103a598f38
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/NaiveMessageFormat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import org.apache.ignite.internal.network.direct.DirectMessageReader;
+import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.serialization.MessageFormat;
+import org.apache.ignite.internal.network.serialization.MessageReader;
+import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.internal.network.serialization.MessageWriter;
+
+/**
+ * Naive message format that uses direct serialization and deserialization.
Both nodes must be of the same version for this format to work.
+ *
+ * <p>This is the default message format used in Ignite.
+ */
+public class NaiveMessageFormat implements MessageFormat {
+ @Override
+ public MessageWriter writer(MessageSerializationRegistry
serializationRegistry, byte protoVer) {
+ return new DirectMessageWriter(serializationRegistry, protoVer);
+ }
+
+ @Override
+ public MessageReader reader(MessageSerializationRegistry
serializationRegistry, byte protoVer) {
+ return new DirectMessageReader(serializationRegistry, protoVer);
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
index 9a859bb7796..e4096e7662d 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
@@ -27,9 +27,9 @@ import java.util.List;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.direct.DirectMessageReader;
import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
import org.apache.ignite.internal.network.serialization.MessageDeserializer;
+import org.apache.ignite.internal.network.serialization.MessageFormat;
import org.apache.ignite.internal.network.serialization.MessageReader;
import
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
@@ -52,6 +52,8 @@ public class InboundDecoder extends ByteToMessageDecoder {
/** Message group type, for partially read message headers. */
private static final AttributeKey<Short> GROUP_TYPE_KEY =
AttributeKey.valueOf("GROUP_TYPE");
+ private final MessageFormat messageFormat;
+
/** Serialization service. */
private final PerSessionSerializationService serializationService;
@@ -60,7 +62,8 @@ public class InboundDecoder extends ByteToMessageDecoder {
*
* @param serializationService Serialization service.
*/
- public InboundDecoder(PerSessionSerializationService serializationService)
{
+ public InboundDecoder(MessageFormat messageFormat,
PerSessionSerializationService serializationService) {
+ this.messageFormat = messageFormat;
this.serializationService = serializationService;
}
@@ -73,7 +76,7 @@ public class InboundDecoder extends ByteToMessageDecoder {
MessageReader reader = readerAttr.get();
if (reader == null) {
- reader = new
DirectMessageReader(serializationService.serializationRegistry(),
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+ reader =
messageFormat.reader(serializationService.serializationRegistry(),
ConnectionManager.DIRECT_PROTOCOL_VERSION);
readerAttr.set(reader);
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index f842190a3f5..82330c28845 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -32,15 +32,15 @@ import java.util.List;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
-import org.apache.ignite.internal.network.direct.DirectMessageWriter;
import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.serialization.MessageFormat;
import org.apache.ignite.internal.network.serialization.MessageSerializer;
import org.apache.ignite.internal.network.serialization.MessageWriter;
import
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
/**
- * An encoder for the outbound messages that uses {@link DirectMessageWriter}.
+ * An encoder for the outbound messages that uses the provided {@link
MessageFormat}.
*/
public class OutboundEncoder extends MessageToMessageEncoder<OutNetworkObject>
{
/** Handler name. */
@@ -53,6 +53,8 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
/** Message writer channel attribute key. */
private static final AttributeKey<MessageWriter> WRITER_KEY =
AttributeKey.valueOf("WRITER");
+ private final MessageFormat messageFormat;
+
/** Serialization registry. */
private final PerSessionSerializationService serializationService;
@@ -61,7 +63,8 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
*
* @param serializationService Serialization service.
*/
- public OutboundEncoder(PerSessionSerializationService
serializationService) {
+ public OutboundEncoder(MessageFormat messageFormat,
PerSessionSerializationService serializationService) {
+ this.messageFormat = messageFormat;
this.serializationService = serializationService;
}
@@ -71,7 +74,7 @@ public class OutboundEncoder extends
MessageToMessageEncoder<OutNetworkObject> {
MessageWriter writer = writerAttr.get();
if (writer == null) {
- writer = new
DirectMessageWriter(serializationService.serializationRegistry(),
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+ writer =
messageFormat.writer(serializationService.serializationRegistry(),
ConnectionManager.DIRECT_PROTOCOL_VERSION);
writerAttr.set(writer);
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java
index 7d5faac9dfc..c905e8ac6e3 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java
@@ -22,9 +22,11 @@ import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NaiveMessageFormat;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.serialization.MessageFormat;
import
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
/** Pipeline utils. */
@@ -58,13 +60,15 @@ public class PipelineUtils {
*/
public static void setup(ChannelPipeline pipeline,
PerSessionSerializationService serializationService,
HandshakeManager handshakeManager, Consumer<InNetworkObject>
messageListener) {
+ MessageFormat messageFormat = new NaiveMessageFormat();
+
// Consolidate flushes to bigger ones (improves throughput with
smaller messages at the price of the latency).
pipeline.addLast(new
FlushConsolidationHandler(FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES,
true));
- pipeline.addLast(InboundDecoder.NAME, new
InboundDecoder(serializationService));
+ pipeline.addLast(InboundDecoder.NAME, new
InboundDecoder(messageFormat, serializationService));
pipeline.addLast(HandshakeHandler.NAME, new
HandshakeHandler(handshakeManager, messageListener, serializationService));
pipeline.addLast(CHUNKED_WRITE_HANDLER_NAME, new
ChunkedWriteHandler());
- pipeline.addLast(OutboundEncoder.NAME, new
OutboundEncoder(serializationService));
+ pipeline.addLast(OutboundEncoder.NAME, new
OutboundEncoder(messageFormat, serializationService));
pipeline.addLast(IoExceptionSuppressingHandler.NAME, new
IoExceptionSuppressingHandler());
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
index 7a390bc22ed..3f87efd70c8 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
@@ -36,14 +36,16 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.apache.ignite.internal.network.AllTypesMessageGenerator;
+import org.apache.ignite.internal.network.NaiveMessageFormat;
import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.direct.DirectMessageWriter;
import org.apache.ignite.internal.network.messages.AllTypesMessage;
import org.apache.ignite.internal.network.messages.NestedMessageMessage;
import org.apache.ignite.internal.network.messages.TestMessage;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
+import org.apache.ignite.internal.network.serialization.MessageFormat;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.network.serialization.MessageSerializer;
+import org.apache.ignite.internal.network.serialization.MessageWriter;
import
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.internal.network.serialization.SerializationService;
import
org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
@@ -63,6 +65,8 @@ public class InboundDecoderTest extends
BaseIgniteAbstractTest {
/** Registry. */
private final MessageSerializationRegistry registry =
defaultSerializationRegistry();
+ private final MessageFormat messageFormat = new NaiveMessageFormat();
+
/**
* Tests that an {@link InboundDecoder} can successfully read a message
with all types supported by direct marshalling.
*
@@ -99,9 +103,9 @@ public class InboundDecoderTest extends
BaseIgniteAbstractTest {
private <T extends NetworkMessage> T sendAndReceive(T msg) {
var serializationService = new SerializationService(registry,
mock(UserObjectSerializationContext.class));
var perSessionSerializationService = new
PerSessionSerializationService(serializationService);
- var channel = new EmbeddedChannel(new
InboundDecoder(perSessionSerializationService));
+ var channel = new EmbeddedChannel(new InboundDecoder(messageFormat,
perSessionSerializationService));
- var writer = new DirectMessageWriter(registry,
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+ MessageWriter writer = messageFormat.writer(registry,
ConnectionManager.DIRECT_PROTOCOL_VERSION);
MessageSerializer<NetworkMessage> serializer =
registry.createSerializer(msg.groupType(), msg.messageType());
@@ -139,7 +143,7 @@ public class InboundDecoderTest extends
BaseIgniteAbstractTest {
public void testPartialHeader() throws Exception {
var serializationService = new SerializationService(registry,
mock(UserObjectSerializationContext.class));
var perSessionSerializationService = new
PerSessionSerializationService(serializationService);
- var channel = new EmbeddedChannel(new
InboundDecoder(perSessionSerializationService));
+ var channel = new EmbeddedChannel(new InboundDecoder(messageFormat,
perSessionSerializationService));
ByteBuf buffer = allocator.buffer();
@@ -174,11 +178,11 @@ public class InboundDecoderTest extends
BaseIgniteAbstractTest {
var serializationService = new SerializationService(registry,
mock(UserObjectSerializationContext.class));
var perSessionSerializationService = new
PerSessionSerializationService(serializationService);
- final var decoder = new InboundDecoder(perSessionSerializationService);
+ final var decoder = new InboundDecoder(messageFormat,
perSessionSerializationService);
final var list = new ArrayList<>();
- var writer = new DirectMessageWriter(registry,
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+ MessageWriter writer = messageFormat.writer(registry,
ConnectionManager.DIRECT_PROTOCOL_VERSION);
var msg = new
TestMessagesFactory().testMessage().msg("abcdefghijklmn").build();
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
index 3e21902f078..3e6ee0bd3a9 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
@@ -43,6 +43,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.ignite.internal.network.NaiveMessageFormat;
import org.apache.ignite.internal.network.OutNetworkObject;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.messages.MessageWithMarshallable;
@@ -68,6 +69,8 @@ public class MarshallableTest extends BaseIgniteAbstractTest {
private final TestMessagesFactory msgFactory = new TestMessagesFactory();
+ private final MessageFormat messageFormat = new NaiveMessageFormat();
+
/**
* Tests that marshallable object can be serialized along with its
descriptor.
*/
@@ -95,7 +98,7 @@ public class MarshallableTest extends BaseIgniteAbstractTest {
var channel = new EmbeddedChannel(
new ChunkedWriteHandler(),
- new OutboundEncoder(serializers.perSessionSerializationService)
+ new OutboundEncoder(messageFormat,
serializers.perSessionSerializationService)
);
List<ClassDescriptorMessage> classDescriptorsMessages =
PerSessionSerializationService.createClassDescriptorsMessages(
@@ -134,7 +137,7 @@ public class MarshallableTest extends
BaseIgniteAbstractTest {
PerSessionSerializationService perSessionSerializationService =
serializers.perSessionSerializationService;
ClassDescriptor descriptor = serializers.descriptor;
- final var decoder = new InboundDecoder(perSessionSerializationService);
+ final var decoder = new InboundDecoder(messageFormat,
perSessionSerializationService);
int size = outBuffer.position();