This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new ebc9f90  Message marshalling refactoring
ebc9f90 is described below

commit ebc9f9000cefd6c46300a63b5853eb1f31d228cd
Author: Igor Seliverstov <gvvinbl...@gmail.com>
AuthorDate: Tue Oct 6 12:49:52 2020 +0300

    Message marshalling refactoring
---
 .../calcite/message/CalciteMessageFactory.java     | 23 +++-------
 .../query/calcite/message/ErrorMessage.java        | 10 ++---
 ...ricRowMessage.java => GenericValueMessage.java} | 40 ++++++++---------
 .../query/calcite/message/MarshalableMessage.java  | 10 ++---
 ...MessageService.java => MarshallingContext.java} | 51 ++++++++++------------
 .../query/calcite/message/MessageService.java      | 12 -----
 .../query/calcite/message/MessageServiceImpl.java  | 15 ++++---
 .../query/calcite/message/MessageType.java         |  3 +-
 .../query/calcite/message/QueryBatchMessage.java   | 27 +++++-------
 .../query/calcite/message/QueryStartRequest.java   | 13 +++---
 .../query/calcite/message/QueryStartResponse.java  | 10 ++---
 .../{MarshalableMessage.java => ValueMessage.java} | 29 ++++++------
 .../query/calcite/metadata/NodesMapping.java       |  7 ++-
 .../query/calcite/prepare/FragmentDescription.java | 11 +++--
 14 files changed, 109 insertions(+), 152 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
index 61b77f3..9774f4e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.util.function.Supplier;
-
 import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 
 /**
@@ -35,23 +33,12 @@ public class CalciteMessageFactory implements 
MessageFactoryProvider {
     }
 
     /**
-     * Produces a row message.
-     *
-     * TODO In future a row is expected to implement Message interface.
-     */
-    public static Message asMessage(Object row) {
-        return new GenericRowMessage(row);
-    }
-
-    /**
-     * Produces a row from a message.
-     *
-     * TODO In future a row is expected to implement Message interface.
+     * Produces a value message.
      */
-    public static Object asRow(Message mRow) {
-        if (mRow instanceof GenericRowMessage)
-            return ((GenericRowMessage) mRow).row();
+    public static ValueMessage asMessage(Object val) {
+        if (val == null)
+            return null;
 
-        throw new AssertionError("Unexpected message type. [message=" + mRow + 
"]");
+        return new GenericValueMessage(val);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
index de45c18..6d09c09 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java
@@ -19,10 +19,8 @@ package 
org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.nio.ByteBuffer;
 import java.util.UUID;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
@@ -163,12 +161,12 @@ public class ErrorMessage implements MarshalableMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareMarshal(Marshaller marshaller) throws 
IgniteCheckedException {
-        errBytes = marshaller.marshal(err);
+    @Override public void prepareMarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
+        errBytes = ctx.marshal(err);
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader 
loader) throws IgniteCheckedException {
-        err = marshaller.unmarshal(errBytes, loader);
+    @Override public void prepareUnmarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
+        err = ctx.unmarshal(errBytes);
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericRowMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
similarity index 71%
rename from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericRowMessage.java
rename to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
index ae71e6e..f4c8d76 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericRowMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
@@ -18,49 +18,47 @@
 package org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.nio.ByteBuffer;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
  *
  */
-public class GenericRowMessage implements MarshalableMessage {
+public final class GenericValueMessage implements ValueMessage {
     /** */
     @GridDirectTransient
-    private Object row;
+    private Object val;
 
     /** */
-    private byte[] serRow;
+    private byte[] serialized;
 
     /** */
-    public GenericRowMessage() {
+    public GenericValueMessage() {
 
     }
 
     /** */
-    public GenericRowMessage(Object row) {
-        this.row = row;
+    public GenericValueMessage(Object val) {
+        this.val = val;
     }
 
-    /** */
-    public Object row() {
-        return row;
+    /** {@inheritDoc} */
+    @Override public Object value() {
+        return val;
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareMarshal(Marshaller marshaller) throws 
IgniteCheckedException {
-        if (row != null && serRow == null)
-            serRow = marshaller.marshal(row);
+    @Override public void prepareMarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
+        if (val != null && serialized == null)
+            serialized = ctx.marshal(val);
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader 
loader) throws IgniteCheckedException {
-        if (serRow != null && row == null)
-            row = marshaller.unmarshal(serRow, loader);
+    @Override public void prepareUnmarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
+        if (serialized != null && val == null)
+            val = ctx.unmarshal(serialized);
     }
 
     /** {@inheritDoc} */
@@ -76,7 +74,7 @@ public class GenericRowMessage implements MarshalableMessage {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeByteArray("serRow", serRow))
+                if (!writer.writeByteArray("serialized", serialized))
                     return false;
 
                 writer.incrementState();
@@ -95,7 +93,7 @@ public class GenericRowMessage implements MarshalableMessage {
 
         switch (reader.state()) {
             case 0:
-                serRow = reader.readByteArray("serRow");
+                serialized = reader.readByteArray("serialized");
 
                 if (!reader.isLastRead())
                     return false;
@@ -104,12 +102,12 @@ public class GenericRowMessage implements 
MarshalableMessage {
 
         }
 
-        return reader.afterMessageRead(GenericRowMessage.class);
+        return reader.afterMessageRead(GenericValueMessage.class);
     }
 
     /** {@inheritDoc} */
     @Override public MessageType type() {
-        return MessageType.GENERIC_ROW_MESSAGE;
+        return MessageType.GENERIC_VALUE_MESSAGE;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java
index b555157..26db96b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.calcite.message;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.marshaller.Marshaller;
 
 /**
  *
@@ -27,15 +26,14 @@ public interface MarshalableMessage extends CalciteMessage {
     /**
      * Prepares the message before sending.
      *
-     * @param marshaller Marchaller.
+     * @param ctx Marshaling context.
      */
-    void prepareMarshal(Marshaller marshaller) throws IgniteCheckedException;
+    void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException;
 
     /**
      * Prepares the message before processing.
      *
-     * @param marshaller Marchaller.
-     * @param loader Class loader.
+     * @param ctx Marshaling context.
      */
-    void prepareUnmarshal(Marshaller marshaller, ClassLoader loader) throws 
IgniteCheckedException;
+    void prepareUnmarshal(MarshallingContext ctx) throws 
IgniteCheckedException;
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshallingContext.java
similarity index 59%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshallingContext.java
index 1376c00..1599ea8 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshallingContext.java
@@ -17,47 +17,42 @@
 
 package org.apache.ignite.internal.processors.query.calcite.message;
 
-import java.util.UUID;
-
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.query.calcite.util.Service;
 import org.apache.ignite.marshaller.Marshaller;
+import org.jetbrains.annotations.Nullable;
 
-/**
- *
- */
-public interface MessageService extends Service {
+/** */
+public interface MarshallingContext {
     /**
-     * Sends a message to given node.
-     *
-     * @param nodeId Node ID.
-     * @param msg Message.
+     * @return Message marshaller.
      */
-    void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
+    Marshaller marshaller();
 
     /**
-     * Checks whether a node with given ID is alive.
-     *
-     * @param nodeId Node ID.
-     * @return {@code True} if node is alive.
+     * @return Message class loader.
      */
-    boolean alive(UUID nodeId);
+    ClassLoader classLoader();
 
     /**
-     * Registers a listener for messages of a given type.
+     * Unmarshals object from byte array.
      *
-     * @param lsnr Listener.
-     * @param type Message type.
+     * @param <T> Type of unmarshalled object.
+     * @param bytes Byte array.
+     * @return Unmarshalled object.
+     * @throws IgniteCheckedException If unmarshalling failed.
      */
-    void register(MessageListener lsnr, MessageType type);
+    default <T> T unmarshal(byte[] bytes) throws IgniteCheckedException {
+        return marshaller().unmarshal(bytes, classLoader());
+    }
 
     /**
-     * @return Message marshaller.
-     */
-    Marshaller marshaller();
-
-    /**
-     * @return Message class loader.
+     * Marshals object to byte array.
+     *
+     * @param obj Object to marshal. {@code null} object will be marshaled to 
binary {@code null} representation.
+     * @return Byte array.
+     * @throws IgniteCheckedException If marshalling failed.
      */
-    ClassLoader classLoader();
+    default byte[] marshal(@Nullable Object obj) throws IgniteCheckedException 
{
+        return marshaller().marshal(obj);
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
index 1376c00..96b2aee 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
@@ -18,10 +18,8 @@
 package org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.util.UUID;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
-import org.apache.ignite.marshaller.Marshaller;
 
 /**
  *
@@ -50,14 +48,4 @@ public interface MessageService extends Service {
      * @param type Message type.
      */
     void register(MessageListener lsnr, MessageType type);
-
-    /**
-     * @return Message marshaller.
-     */
-    Marshaller marshaller();
-
-    /**
-     * @return Message class loader.
-     */
-    ClassLoader classLoader();
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
index 1061c6e..c3b8cc2 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
@@ -21,7 +21,6 @@ import java.util.EnumMap;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
@@ -45,20 +44,26 @@ import 
org.apache.ignite.plugin.extensions.communication.Message;
 /**
  *
  */
-public class MessageServiceImpl extends AbstractService implements 
MessageService {
+public class MessageServiceImpl extends AbstractService implements 
MessageService, MarshallingContext {
     /** */
     private final GridMessageListener msgLsnr;
 
+    /** */
     private UUID localNodeId;
 
+    /** */
     private GridIoManager ioManager;
 
+    /** */
     private ClassLoader classLoader;
 
+    /** */
     private QueryTaskExecutor taskExecutor;
 
+    /** */
     private FailureProcessor failureProcessor;
 
+    /** */
     private Marshaller marsh;
 
     /** */
@@ -222,9 +227,9 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
     protected void prepareMarshal(Message msg) throws IgniteCheckedException {
         try {
             if (msg instanceof MarshalableMessage)
-                ((MarshalableMessage) msg).prepareMarshal(marshaller());
+                ((MarshalableMessage) msg).prepareMarshal(this);
         }
-        catch (IgniteCheckedException e) {
+        catch (Exception e) {
             failureProcessor().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
 
             throw e;
@@ -235,7 +240,7 @@ public class MessageServiceImpl extends AbstractService 
implements MessageServic
     protected void prepareUnmarshal(Message msg) throws IgniteCheckedException 
{
         try {
             if (msg instanceof MarshalableMessage)
-                ((MarshalableMessage) msg).prepareUnmarshal(marshaller(), 
classLoader());
+                ((MarshalableMessage) msg).prepareUnmarshal(this);
         }
         catch (Exception e) {
             failureProcessor().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index 6979b06..858a0ce 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.util.function.Supplier;
-
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescription;
 
@@ -33,7 +32,7 @@ public enum MessageType {
     QUERY_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage::new),
     QUERY_INBOX_CANCEL_MESSAGE(305, InboxCloseMessage::new),
     QUERY_OUTBOX_CANCEL_MESSAGE(306, OutboxCloseMessage::new),
-    GENERIC_ROW_MESSAGE(307, GenericRowMessage::new),
+    GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new),
 
     NODES_MAPPING(350, NodesMapping::new),
     FRAGMENT_DESCRIPTION(351, FragmentDescription::new);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
index 627c6d6..7b5c997 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
@@ -21,12 +21,9 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -55,8 +52,8 @@ public class QueryBatchMessage implements MarshalableMessage, 
ExecutionContextAw
     private List<Object> rows;
 
     /** */
-    @GridDirectCollection(Message.class)
-    private List<Message> mRows;
+    @GridDirectCollection(ValueMessage.class)
+    private List<ValueMessage> mRows;
 
     /** */
     public QueryBatchMessage() {
@@ -111,36 +108,36 @@ public class QueryBatchMessage implements 
MarshalableMessage, ExecutionContextAw
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareMarshal(Marshaller marshaller) throws 
IgniteCheckedException {
+    @Override public void prepareMarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
         if (mRows != null || rows == null)
             return;
 
         mRows = new ArrayList<>(rows.size());
 
         for (Object row : rows) {
-            Message mRow = CalciteMessageFactory.asMessage(row);
+            ValueMessage mRow = CalciteMessageFactory.asMessage(row);
+
+            assert mRow != null;
 
-            if (mRow instanceof MarshalableMessage)
-                ((MarshalableMessage) mRow).prepareMarshal(marshaller);
+            mRow.prepareMarshal(ctx);
 
             mRows.add(mRow);
         }
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader 
loader) throws IgniteCheckedException {
+    @Override public void prepareUnmarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
         if (rows != null || mRows == null)
             return;
 
         rows = new ArrayList<>(mRows.size());
 
-        for (Message mRow : mRows) {
-            if (mRow instanceof MarshalableMessage)
-                ((MarshalableMessage) mRow).prepareUnmarshal(marshaller, 
loader);
+        for (ValueMessage mRow : mRows) {
+            assert mRow != null;
 
-            Object row = CalciteMessageFactory.asRow(mRow);
+            mRow.prepareUnmarshal(ctx);
 
-            rows.add(row);
+            rows.add(mRow.value());
         }
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
index 8b9bc45..470ac3e 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
@@ -23,7 +23,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescription;
-import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
@@ -115,19 +114,19 @@ public class QueryStartRequest implements 
MarshalableMessage, ExecutionContextAw
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareMarshal(Marshaller marshaller) throws 
IgniteCheckedException {
+    @Override public void prepareMarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
         if (paramsBytes == null && params != null)
-            paramsBytes = marshaller.marshal(params);
+            paramsBytes = ctx.marshal(params);
 
-        fragmentDesc.prepareMarshal(marshaller);
+        fragmentDesc.prepareMarshal(ctx);
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader 
ldr) throws IgniteCheckedException {
+    @Override public void prepareUnmarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
         if (params == null && paramsBytes != null)
-            params = marshaller.unmarshal(paramsBytes, ldr);
+            params = ctx.unmarshal(paramsBytes);
 
-        fragmentDesc.prepareUnmarshal(marshaller, ldr);
+        fragmentDesc.prepareUnmarshal(ctx);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
index 3118b43..5464f9d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
@@ -19,10 +19,8 @@ package 
org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.nio.ByteBuffer;
 import java.util.UUID;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
@@ -80,15 +78,15 @@ public class QueryStartResponse implements 
MarshalableMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareMarshal(Marshaller marshaller) throws 
IgniteCheckedException {
+    @Override public void prepareMarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
         if (error != null)
-            errBytes = marshaller.marshal(error);
+            errBytes = ctx.marshal(error);
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader 
loader) throws IgniteCheckedException {
+    @Override public void prepareUnmarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
         if (errBytes != null)
-            error = marshaller.unmarshal(errBytes, loader);
+            error = ctx.unmarshal(errBytes);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java
similarity index 64%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java
index b555157..82232b6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java
@@ -18,24 +18,21 @@
 package org.apache.ignite.internal.processors.query.calcite.message;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.marshaller.Marshaller;
 
-/**
- *
- */
-public interface MarshalableMessage extends CalciteMessage {
+/** */
+public interface ValueMessage extends MarshalableMessage {
     /**
-     * Prepares the message before sending.
-     *
-     * @param marshaller Marchaller.
+     * @return Wrapped value.
      */
-    void prepareMarshal(Marshaller marshaller) throws IgniteCheckedException;
+    Object value();
 
-    /**
-     * Prepares the message before processing.
-     *
-     * @param marshaller Marchaller.
-     * @param loader Class loader.
-     */
-    void prepareUnmarshal(Marshaller marshaller, ClassLoader loader) throws 
IgniteCheckedException;
+    /** {@inheritDoc} */
+    @Override default void prepareMarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
+    @Override default void prepareUnmarshal(MarshallingContext ctx) throws 
IgniteCheckedException {
+        // No-op
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
index 2020dbc..3b0118a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java
@@ -23,12 +23,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
-
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.query.calcite.message.MarshalableMessage;
+import 
org.apache.ignite.internal.processors.query.calcite.message.MarshallingContext;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.UUIDCollectionMessage;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -367,13 +366,13 @@ public class NodesMapping implements MarshalableMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareMarshal(Marshaller marshaller) {
+    @Override public void prepareMarshal(MarshallingContext ctx) {
         if (assignments != null && assignments0 == null)
             assignments0 = Commons.transform(assignments, this::transform);
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader 
loader) {
+    @Override public void prepareUnmarshal(MarshallingContext ctx) {
         if (assignments0 != null && assignments == null)
             assignments = Commons.transform(assignments0, this::transform);
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentDescription.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentDescription.java
index 1475c91..3b0f929 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentDescription.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentDescription.java
@@ -22,15 +22,14 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import 
org.apache.ignite.internal.processors.query.calcite.message.MarshalableMessage;
+import 
org.apache.ignite.internal.processors.query.calcite.message.MarshallingContext;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.util.UUIDCollectionMessage;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -208,7 +207,7 @@ public class FragmentDescription implements 
MarshalableMessage {
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareMarshal(Marshaller marshaller) {
+    @Override public void prepareMarshal(MarshallingContext ctx) {
         if (remoteSources0 == null && remoteSources != null) {
             remoteSources0 = U.newHashMap(remoteSources.size());
 
@@ -217,11 +216,11 @@ public class FragmentDescription implements 
MarshalableMessage {
         }
 
         if (targetMapping != null)
-            targetMapping.prepareMarshal(marshaller);
+            targetMapping.prepareMarshal(ctx);
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader 
loader) {
+    @Override public void prepareUnmarshal(MarshallingContext ctx) {
         if (remoteSources == null && remoteSources0 != null) {
             remoteSources = U.newHashMap(remoteSources0.size());
 
@@ -230,6 +229,6 @@ public class FragmentDescription implements 
MarshalableMessage {
         }
 
         if (targetMapping != null)
-            targetMapping.prepareUnmarshal(marshaller, loader);
+            targetMapping.prepareUnmarshal(ctx);
     }
 }

Reply via email to