This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push: new ebc9f90 Message marshalling refactoring ebc9f90 is described below commit ebc9f9000cefd6c46300a63b5853eb1f31d228cd Author: Igor Seliverstov <gvvinbl...@gmail.com> AuthorDate: Tue Oct 6 12:49:52 2020 +0300 Message marshalling refactoring --- .../calcite/message/CalciteMessageFactory.java | 23 +++------- .../query/calcite/message/ErrorMessage.java | 10 ++--- ...ricRowMessage.java => GenericValueMessage.java} | 40 ++++++++--------- .../query/calcite/message/MarshalableMessage.java | 10 ++--- ...MessageService.java => MarshallingContext.java} | 51 ++++++++++------------ .../query/calcite/message/MessageService.java | 12 ----- .../query/calcite/message/MessageServiceImpl.java | 15 ++++--- .../query/calcite/message/MessageType.java | 3 +- .../query/calcite/message/QueryBatchMessage.java | 27 +++++------- .../query/calcite/message/QueryStartRequest.java | 13 +++--- .../query/calcite/message/QueryStartResponse.java | 10 ++--- .../{MarshalableMessage.java => ValueMessage.java} | 29 ++++++------ .../query/calcite/metadata/NodesMapping.java | 7 ++- .../query/calcite/prepare/FragmentDescription.java | 11 +++-- 14 files changed, 109 insertions(+), 152 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java index 61b77f3..9774f4e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java @@ -18,9 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite.message; import java.util.function.Supplier; - import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; /** @@ -35,23 +33,12 @@ public class CalciteMessageFactory implements MessageFactoryProvider { } /** - * Produces a row message. - * - * TODO In future a row is expected to implement Message interface. - */ - public static Message asMessage(Object row) { - return new GenericRowMessage(row); - } - - /** - * Produces a row from a message. - * - * TODO In future a row is expected to implement Message interface. + * Produces a value message. */ - public static Object asRow(Message mRow) { - if (mRow instanceof GenericRowMessage) - return ((GenericRowMessage) mRow).row(); + public static ValueMessage asMessage(Object val) { + if (val == null) + return null; - throw new AssertionError("Unexpected message type. [message=" + mRow + "]"); + return new GenericValueMessage(val); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java index de45c18..6d09c09 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java @@ -19,10 +19,8 @@ package org.apache.ignite.internal.processors.query.calcite.message; import java.nio.ByteBuffer; import java.util.UUID; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -163,12 +161,12 @@ public class ErrorMessage implements MarshalableMessage { } /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marshaller) throws IgniteCheckedException { - errBytes = marshaller.marshal(err); + @Override public void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException { + errBytes = ctx.marshal(err); } /** {@inheritDoc} */ - @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader loader) throws IgniteCheckedException { - err = marshaller.unmarshal(errBytes, loader); + @Override public void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException { + err = ctx.unmarshal(errBytes); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericRowMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java similarity index 71% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericRowMessage.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java index ae71e6e..f4c8d76 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericRowMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java @@ -18,49 +18,47 @@ package org.apache.ignite.internal.processors.query.calcite.message; import java.nio.ByteBuffer; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * */ -public class GenericRowMessage implements MarshalableMessage { +public final class GenericValueMessage implements ValueMessage { /** */ @GridDirectTransient - private Object row; + private Object val; /** */ - private byte[] serRow; + private byte[] serialized; /** */ - public GenericRowMessage() { + public GenericValueMessage() { } /** */ - public GenericRowMessage(Object row) { - this.row = row; + public GenericValueMessage(Object val) { + this.val = val; } - /** */ - public Object row() { - return row; + /** {@inheritDoc} */ + @Override public Object value() { + return val; } /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marshaller) throws IgniteCheckedException { - if (row != null && serRow == null) - serRow = marshaller.marshal(row); + @Override public void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException { + if (val != null && serialized == null) + serialized = ctx.marshal(val); } /** {@inheritDoc} */ - @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader loader) throws IgniteCheckedException { - if (serRow != null && row == null) - row = marshaller.unmarshal(serRow, loader); + @Override public void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException { + if (serialized != null && val == null) + val = ctx.unmarshal(serialized); } /** {@inheritDoc} */ @@ -76,7 +74,7 @@ public class GenericRowMessage implements MarshalableMessage { switch (writer.state()) { case 0: - if (!writer.writeByteArray("serRow", serRow)) + if (!writer.writeByteArray("serialized", serialized)) return false; writer.incrementState(); @@ -95,7 +93,7 @@ public class GenericRowMessage implements MarshalableMessage { switch (reader.state()) { case 0: - serRow = reader.readByteArray("serRow"); + serialized = reader.readByteArray("serialized"); if (!reader.isLastRead()) return false; @@ -104,12 +102,12 @@ public class GenericRowMessage implements MarshalableMessage { } - return reader.afterMessageRead(GenericRowMessage.class); + return reader.afterMessageRead(GenericValueMessage.class); } /** {@inheritDoc} */ @Override public MessageType type() { - return MessageType.GENERIC_ROW_MESSAGE; + return MessageType.GENERIC_VALUE_MESSAGE; } /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java index b555157..26db96b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.query.calcite.message; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.marshaller.Marshaller; /** * @@ -27,15 +26,14 @@ public interface MarshalableMessage extends CalciteMessage { /** * Prepares the message before sending. * - * @param marshaller Marchaller. + * @param ctx Marshaling context. */ - void prepareMarshal(Marshaller marshaller) throws IgniteCheckedException; + void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException; /** * Prepares the message before processing. * - * @param marshaller Marchaller. - * @param loader Class loader. + * @param ctx Marshaling context. */ - void prepareUnmarshal(Marshaller marshaller, ClassLoader loader) throws IgniteCheckedException; + void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshallingContext.java similarity index 59% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshallingContext.java index 1376c00..1599ea8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshallingContext.java @@ -17,47 +17,42 @@ package org.apache.ignite.internal.processors.query.calcite.message; -import java.util.UUID; - import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.query.calcite.util.Service; import org.apache.ignite.marshaller.Marshaller; +import org.jetbrains.annotations.Nullable; -/** - * - */ -public interface MessageService extends Service { +/** */ +public interface MarshallingContext { /** - * Sends a message to given node. - * - * @param nodeId Node ID. - * @param msg Message. + * @return Message marshaller. */ - void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException; + Marshaller marshaller(); /** - * Checks whether a node with given ID is alive. - * - * @param nodeId Node ID. - * @return {@code True} if node is alive. + * @return Message class loader. */ - boolean alive(UUID nodeId); + ClassLoader classLoader(); /** - * Registers a listener for messages of a given type. + * Unmarshals object from byte array. * - * @param lsnr Listener. - * @param type Message type. + * @param <T> Type of unmarshalled object. + * @param bytes Byte array. + * @return Unmarshalled object. + * @throws IgniteCheckedException If unmarshalling failed. */ - void register(MessageListener lsnr, MessageType type); + default <T> T unmarshal(byte[] bytes) throws IgniteCheckedException { + return marshaller().unmarshal(bytes, classLoader()); + } /** - * @return Message marshaller. - */ - Marshaller marshaller(); - - /** - * @return Message class loader. + * Marshals object to byte array. + * + * @param obj Object to marshal. {@code null} object will be marshaled to binary {@code null} representation. + * @return Byte array. + * @throws IgniteCheckedException If marshalling failed. */ - ClassLoader classLoader(); + default byte[] marshal(@Nullable Object obj) throws IgniteCheckedException { + return marshaller().marshal(obj); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java index 1376c00..96b2aee 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java @@ -18,10 +18,8 @@ package org.apache.ignite.internal.processors.query.calcite.message; import java.util.UUID; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.query.calcite.util.Service; -import org.apache.ignite.marshaller.Marshaller; /** * @@ -50,14 +48,4 @@ public interface MessageService extends Service { * @param type Message type. */ void register(MessageListener lsnr, MessageType type); - - /** - * @return Message marshaller. - */ - Marshaller marshaller(); - - /** - * @return Message class loader. - */ - ClassLoader classLoader(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java index 1061c6e..c3b8cc2 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java @@ -21,7 +21,6 @@ import java.util.EnumMap; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; @@ -45,20 +44,26 @@ import org.apache.ignite.plugin.extensions.communication.Message; /** * */ -public class MessageServiceImpl extends AbstractService implements MessageService { +public class MessageServiceImpl extends AbstractService implements MessageService, MarshallingContext { /** */ private final GridMessageListener msgLsnr; + /** */ private UUID localNodeId; + /** */ private GridIoManager ioManager; + /** */ private ClassLoader classLoader; + /** */ private QueryTaskExecutor taskExecutor; + /** */ private FailureProcessor failureProcessor; + /** */ private Marshaller marsh; /** */ @@ -222,9 +227,9 @@ public class MessageServiceImpl extends AbstractService implements MessageServic protected void prepareMarshal(Message msg) throws IgniteCheckedException { try { if (msg instanceof MarshalableMessage) - ((MarshalableMessage) msg).prepareMarshal(marshaller()); + ((MarshalableMessage) msg).prepareMarshal(this); } - catch (IgniteCheckedException e) { + catch (Exception e) { failureProcessor().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); throw e; @@ -235,7 +240,7 @@ public class MessageServiceImpl extends AbstractService implements MessageServic protected void prepareUnmarshal(Message msg) throws IgniteCheckedException { try { if (msg instanceof MarshalableMessage) - ((MarshalableMessage) msg).prepareUnmarshal(marshaller(), classLoader()); + ((MarshalableMessage) msg).prepareUnmarshal(this); } catch (Exception e) { failureProcessor().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java index 6979b06..858a0ce 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.query.calcite.message; import java.util.function.Supplier; - import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescription; @@ -33,7 +32,7 @@ public enum MessageType { QUERY_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage::new), QUERY_INBOX_CANCEL_MESSAGE(305, InboxCloseMessage::new), QUERY_OUTBOX_CANCEL_MESSAGE(306, OutboxCloseMessage::new), - GENERIC_ROW_MESSAGE(307, GenericRowMessage::new), + GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new), NODES_MAPPING(350, NodesMapping::new), FRAGMENT_DESCRIPTION(351, FragmentDescription::new); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java index 627c6d6..7b5c997 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java @@ -21,12 +21,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.UUID; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -55,8 +52,8 @@ public class QueryBatchMessage implements MarshalableMessage, ExecutionContextAw private List<Object> rows; /** */ - @GridDirectCollection(Message.class) - private List<Message> mRows; + @GridDirectCollection(ValueMessage.class) + private List<ValueMessage> mRows; /** */ public QueryBatchMessage() { @@ -111,36 +108,36 @@ public class QueryBatchMessage implements MarshalableMessage, ExecutionContextAw } /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marshaller) throws IgniteCheckedException { + @Override public void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException { if (mRows != null || rows == null) return; mRows = new ArrayList<>(rows.size()); for (Object row : rows) { - Message mRow = CalciteMessageFactory.asMessage(row); + ValueMessage mRow = CalciteMessageFactory.asMessage(row); + + assert mRow != null; - if (mRow instanceof MarshalableMessage) - ((MarshalableMessage) mRow).prepareMarshal(marshaller); + mRow.prepareMarshal(ctx); mRows.add(mRow); } } /** {@inheritDoc} */ - @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader loader) throws IgniteCheckedException { + @Override public void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException { if (rows != null || mRows == null) return; rows = new ArrayList<>(mRows.size()); - for (Message mRow : mRows) { - if (mRow instanceof MarshalableMessage) - ((MarshalableMessage) mRow).prepareUnmarshal(marshaller, loader); + for (ValueMessage mRow : mRows) { + assert mRow != null; - Object row = CalciteMessageFactory.asRow(mRow); + mRow.prepareUnmarshal(ctx); - rows.add(row); + rows.add(mRow.value()); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java index 8b9bc45..470ac3e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java @@ -23,7 +23,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescription; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -115,19 +114,19 @@ public class QueryStartRequest implements MarshalableMessage, ExecutionContextAw } /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marshaller) throws IgniteCheckedException { + @Override public void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException { if (paramsBytes == null && params != null) - paramsBytes = marshaller.marshal(params); + paramsBytes = ctx.marshal(params); - fragmentDesc.prepareMarshal(marshaller); + fragmentDesc.prepareMarshal(ctx); } /** {@inheritDoc} */ - @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader ldr) throws IgniteCheckedException { + @Override public void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException { if (params == null && paramsBytes != null) - params = marshaller.unmarshal(paramsBytes, ldr); + params = ctx.unmarshal(paramsBytes); - fragmentDesc.prepareUnmarshal(marshaller, ldr); + fragmentDesc.prepareUnmarshal(ctx); } /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java index 3118b43..5464f9d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java @@ -19,10 +19,8 @@ package org.apache.ignite.internal.processors.query.calcite.message; import java.nio.ByteBuffer; import java.util.UUID; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -80,15 +78,15 @@ public class QueryStartResponse implements MarshalableMessage { } /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marshaller) throws IgniteCheckedException { + @Override public void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException { if (error != null) - errBytes = marshaller.marshal(error); + errBytes = ctx.marshal(error); } /** {@inheritDoc} */ - @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader loader) throws IgniteCheckedException { + @Override public void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException { if (errBytes != null) - error = marshaller.unmarshal(errBytes, loader); + error = ctx.unmarshal(errBytes); } /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java similarity index 64% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java index b555157..82232b6 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MarshalableMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java @@ -18,24 +18,21 @@ package org.apache.ignite.internal.processors.query.calcite.message; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.marshaller.Marshaller; -/** - * - */ -public interface MarshalableMessage extends CalciteMessage { +/** */ +public interface ValueMessage extends MarshalableMessage { /** - * Prepares the message before sending. - * - * @param marshaller Marchaller. + * @return Wrapped value. */ - void prepareMarshal(Marshaller marshaller) throws IgniteCheckedException; + Object value(); - /** - * Prepares the message before processing. - * - * @param marshaller Marchaller. - * @param loader Class loader. - */ - void prepareUnmarshal(Marshaller marshaller, ClassLoader loader) throws IgniteCheckedException; + /** {@inheritDoc} */ + @Override default void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException { + // No-op + } + + /** {@inheritDoc} */ + @Override default void prepareUnmarshal(MarshallingContext ctx) throws IgniteCheckedException { + // No-op + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java index 2020dbc..3b0118a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/NodesMapping.java @@ -23,12 +23,12 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.UUID; - import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.query.calcite.message.MarshalableMessage; +import org.apache.ignite.internal.processors.query.calcite.message.MarshallingContext; import org.apache.ignite.internal.processors.query.calcite.message.MessageType; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter; import org.apache.ignite.internal.processors.query.calcite.util.Commons; @@ -36,7 +36,6 @@ import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -367,13 +366,13 @@ public class NodesMapping implements MarshalableMessage { } /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marshaller) { + @Override public void prepareMarshal(MarshallingContext ctx) { if (assignments != null && assignments0 == null) assignments0 = Commons.transform(assignments, this::transform); } /** {@inheritDoc} */ - @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader loader) { + @Override public void prepareUnmarshal(MarshallingContext ctx) { if (assignments0 != null && assignments == null) assignments = Commons.transform(assignments0, this::transform); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentDescription.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentDescription.java index 1475c91..3b0f929 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentDescription.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentDescription.java @@ -22,15 +22,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; - import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.query.calcite.message.MarshalableMessage; +import org.apache.ignite.internal.processors.query.calcite.message.MarshallingContext; import org.apache.ignite.internal.processors.query.calcite.message.MessageType; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -208,7 +207,7 @@ public class FragmentDescription implements MarshalableMessage { } /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marshaller) { + @Override public void prepareMarshal(MarshallingContext ctx) { if (remoteSources0 == null && remoteSources != null) { remoteSources0 = U.newHashMap(remoteSources.size()); @@ -217,11 +216,11 @@ public class FragmentDescription implements MarshalableMessage { } if (targetMapping != null) - targetMapping.prepareMarshal(marshaller); + targetMapping.prepareMarshal(ctx); } /** {@inheritDoc} */ - @Override public void prepareUnmarshal(Marshaller marshaller, ClassLoader loader) { + @Override public void prepareUnmarshal(MarshallingContext ctx) { if (remoteSources == null && remoteSources0 != null) { remoteSources = U.newHashMap(remoteSources0.size()); @@ -230,6 +229,6 @@ public class FragmentDescription implements MarshalableMessage { } if (targetMapping != null) - targetMapping.prepareUnmarshal(marshaller, loader); + targetMapping.prepareUnmarshal(ctx); } }