This is an automated email from the ASF dual-hosted git repository.
anton-vinogradov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 14e949855d9 IGNITE-28334 Replace CalciteMarshalableMessage with
MarshallableMessage (#13022)
14e949855d9 is described below
commit 14e949855d99631eaa7e05782b599d28fbca2d22
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Apr 20 19:23:29 2026 +0300
IGNITE-28334 Replace CalciteMarshalableMessage with MarshallableMessage
(#13022)
---
.../query/calcite/exec/ExchangeServiceImpl.java | 10 +--
...java => CalciteContextMarshallableMessage.java} | 10 +--
.../query/calcite/message/CalciteErrorMessage.java | 54 ++-----------
.../query/calcite/message/CalciteMessage.java | 30 -------
.../calcite/message/CalciteMessageFactory.java | 25 +++---
.../calcite/message/ExecutionContextAware.java | 3 +-
.../query/calcite/message/GenericValueMessage.java | 28 +++----
.../query/calcite/message/MessageListener.java | 3 +-
.../query/calcite/message/MessageService.java | 3 +-
.../query/calcite/message/MessageServiceImpl.java | 35 ++++----
.../query/calcite/message/MessageType.java | 69 +++++++---------
.../message/QueryBatchAcknowledgeMessage.java | 5 --
.../query/calcite/message/QueryBatchMessage.java | 60 ++------------
.../query/calcite/message/QueryCloseMessage.java | 8 +-
...oseMessage.java => QueryInboxCloseMessage.java} | 12 +--
.../query/calcite/message/QueryStartRequest.java | 35 ++++----
.../query/calcite/message/QueryStartResponse.java | 8 +-
.../query/calcite/message/QueryTxEntry.java | 15 ++--
.../query/calcite/message/ValueMessage.java | 39 ---------
.../query/calcite/metadata/ColocationGroup.java | 17 ++--
.../calcite/metadata/FragmentDescription.java | 33 +++-----
.../query/calcite/metadata/FragmentMapping.java | 25 +-----
.../calcite/exec/rel/AbstractExecutionTest.java | 13 +--
.../calcite/exec/rel/ContinuousExecutionTest.java | 6 ++
.../query/calcite/message/TestIoManager.java | 3 +-
.../query/calcite/planner/AbstractPlannerTest.java | 13 +--
.../query/calcite/planner/PlanExecutionTest.java | 11 +++
.../ignite/internal/CoreMessagesProvider.java | 88 +++++++-------------
.../org/apache/ignite/internal/IgniteKernal.java | 16 ++--
.../managers/communication/GridIoManager.java | 9 +--
...AbstractMarshallableMessageFactoryProvider.java | 93 ++++++++++++++++++++++
.../direct/DirectMarshallingMessagesTest.java | 2 +-
.../IgniteCoreMessagesSerializationTest.java | 2 +-
.../communication/CompressedMessageTest.java | 2 +-
...niteCacheContinuousQueryImmutableEntryTest.java | 2 +-
.../GridAbstractCommunicationSelfTest.java | 2 +-
...pCommunicationSpiConcurrentConnectSelfTest.java | 2 +-
.../tcp/GridTcpCommunicationSpiConfigSelfTest.java | 2 +-
...idTcpCommunicationSpiMultithreadedSelfTest.java | 2 +-
...GridTcpCommunicationSpiRecoveryAckSelfTest.java | 2 +-
.../GridTcpCommunicationSpiRecoverySelfTest.java | 2 +-
...TcpCommunicationRecoveryAckClosureSelfTest.java | 2 +-
.../spi/discovery/tcp/TestTcpDiscoverySpi.java | 2 +-
.../ignite/testframework/GridSpiTestContext.java | 2 +-
.../ignite/testframework/junits/IgniteMock.java | 3 +-
45 files changed, 318 insertions(+), 490 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 039b14ed9e9..ebf5c85c533 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -32,12 +32,12 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
-import
org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryBatchAcknowledgeMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryCloseMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.QueryInboxCloseMessage;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
import
org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
@@ -165,7 +165,7 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
/** {@inheritDoc} */
@Override public void closeInbox(UUID nodeId, UUID qryId, long fragmentId,
long exchangeId) throws IgniteCheckedException {
- messageService().send(nodeId, new InboxCloseMessage(qryId, fragmentId,
exchangeId));
+ messageService().send(nodeId, new QueryInboxCloseMessage(qryId,
fragmentId, exchangeId));
}
/** {@inheritDoc} */
@@ -188,8 +188,8 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
/** {@inheritDoc} */
@Override public void init() {
- messageService().register((n, m) -> onMessage(n,
(InboxCloseMessage)m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
- messageService().register((n, m) -> onMessage(n,
(QueryBatchAcknowledgeMessage)m), MessageType.QUERY_ACKNOWLEDGE_MESSAGE);
+ messageService().register((n, m) -> onMessage(n,
(QueryInboxCloseMessage)m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
+ messageService().register((n, m) -> onMessage(n,
(QueryBatchAcknowledgeMessage)m), MessageType.QUERY_BATCH_ACKNOWLEDGE_MESSAGE);
messageService().register((n, m) -> onMessage(n,
(QueryBatchMessage)m), MessageType.QUERY_BATCH_MESSAGE);
messageService().register((n, m) -> onMessage(n,
(QueryCloseMessage)m), MessageType.QUERY_CLOSE_MESSAGE);
}
@@ -221,7 +221,7 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
}
/** */
- protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
+ protected void onMessage(UUID nodeId, QueryInboxCloseMessage msg) {
Collection<Inbox<?>> inboxes =
mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
if (!F.isEmpty(inboxes)) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMarshalableMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteContextMarshallableMessage.java
similarity index 78%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMarshalableMessage.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteContextMarshallableMessage.java
index b671639ebd6..f6ba42de959 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMarshalableMessage.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteContextMarshallableMessage.java
@@ -19,11 +19,10 @@ package
org.apache.ignite.internal.processors.query.calcite.message;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.plugin.extensions.communication.Message;
-/**
- *
- */
-public interface CalciteMarshalableMessage extends CalciteMessage {
+/** A Calcite engine related message which requires marshalling with context.
*/
+public interface CalciteContextMarshallableMessage extends Message {
/**
* Prepares the message before sending.
*
@@ -35,6 +34,7 @@ public interface CalciteMarshalableMessage extends
CalciteMessage {
* Prepares the message before processing.
*
* @param ctx Cache shared context.
+ * @param clsLdr Class loader.
*/
- void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws
IgniteCheckedException;
+ void finishUnmarshal(GridCacheSharedContext<?, ?> ctx, ClassLoader clsLdr)
throws IgniteCheckedException;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
index ba20b2526b5..9d62635cc22 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java
@@ -18,17 +18,11 @@
package org.apache.ignite.internal.processors.query.calcite.message;
import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
-/**
- *
- */
-public class CalciteErrorMessage implements CalciteMarshalableMessage {
+/** */
+public class CalciteErrorMessage extends ErrorMessage {
/** */
@Order(0)
UUID qryId;
@@ -37,14 +31,6 @@ public class CalciteErrorMessage implements
CalciteMarshalableMessage {
@Order(1)
long fragmentId;
- /** Error bytes. */
- @Order(2)
- @GridToStringExclude
- @Nullable public byte[] errBytes;
-
- /** Error. */
- private @Nullable Throwable err;
-
/** */
public CalciteErrorMessage() {
// No-op.
@@ -52,47 +38,21 @@ public class CalciteErrorMessage implements
CalciteMarshalableMessage {
/** */
public CalciteErrorMessage(UUID qryId, long fragmentId, Throwable err) {
+ super(err);
+
assert err != null;
this.qryId = qryId;
this.fragmentId = fragmentId;
- this.err = err;
}
- /**
- * @return Query ID.
- */
+ /** @return Query ID. */
public UUID queryId() {
return qryId;
}
- /**
- * @return Fragment ID.
- */
+ /** @return Fragment ID. */
public long fragmentId() {
return fragmentId;
}
-
- /** */
- public @Nullable Throwable error() {
- return err;
- }
-
- /** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.QUERY_ERROR_MESSAGE;
- }
-
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- if (err != null)
- errBytes = U.marshal(ctx.marshaller(), err);
- }
-
- /** {@inheritDoc} */
- @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- if (errBytes != null)
- err = U.unmarshal(ctx.marshaller(), errBytes,
U.resolveClassLoader(ctx.cache().context().gridConfig()));
- }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessage.java
deleted file mode 100644
index d23affbb5c3..00000000000
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessage.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.message;
-
-import org.apache.ignite.plugin.extensions.communication.Message;
-
-/**
- *
- */
-public interface CalciteMessage extends Message {
- /**
- * @return Message type.
- */
- MessageType type();
-}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
index 1d707cf0c85..f3a9d6f431d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
@@ -17,28 +17,21 @@
package org.apache.ignite.internal.processors.query.calcite.message;
-import java.util.function.Supplier;
+import org.apache.ignite.internal.MarshallableMessage;
+import
org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
/**
* Message factory.
*/
-public class CalciteMessageFactory implements MessageFactoryProvider {
+public class CalciteMessageFactory extends
AbstractMarshallableMessageFactoryProvider {
/** {@inheritDoc} */
- @SuppressWarnings({"unchecked", "rawtypes"})
@Override public void registerAll(MessageFactory factory) {
- for (MessageType type : MessageType.values())
- factory.register(type.directType(), (Supplier)type.factory(),
type.serializer());
- }
-
- /**
- * Produces a value message.
- */
- public static ValueMessage asMessage(Object val) {
- if (val == null)
- return null;
-
- return new GenericValueMessage(val);
+ for (MessageType type : MessageType.values()) {
+ if
(MarshallableMessage.class.isAssignableFrom(type.messageClass()))
+ register(factory, type.messageClass(), type.directType(),
schemaAwareMarsh, resolvedClsLdr);
+ else
+ register(factory, type.messageClass(), type.directType(),
dfltMarsh, dftlClsLdr);
+ }
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java
index 883f504af47..5710a68fc70 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java
@@ -18,11 +18,12 @@
package org.apache.ignite.internal.processors.query.calcite.message;
import java.util.UUID;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
* Execution context is used to determine a stripe where to process a message.
*/
-public interface ExecutionContextAware extends CalciteMessage {
+public interface ExecutionContextAware extends Message {
/**
* @return Query ID.
*/
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
index b03659defa9..40849bab6a7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
@@ -18,14 +18,13 @@
package org.apache.ignite.internal.processors.query.calcite.message;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
-/**
- *
- */
-public final class GenericValueMessage implements ValueMessage {
+/** */
+public final class GenericValueMessage implements MarshallableMessage {
/** */
private Object val;
@@ -35,7 +34,7 @@ public final class GenericValueMessage implements
ValueMessage {
/** */
public GenericValueMessage() {
-
+ // No-op.
}
/** */
@@ -43,25 +42,22 @@ public final class GenericValueMessage implements
ValueMessage {
this.val = val;
}
- /** {@inheritDoc} */
- @Override public Object value() {
+ /** */
+ public Object value() {
return val;
}
/** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
if (val != null && serialized == null)
- serialized = U.marshal(ctx, val);
+ serialized = U.marshal(marsh, val);
}
/** {@inheritDoc} */
- @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
if (serialized != null && val == null)
- val = U.unmarshal(ctx, serialized,
U.resolveClassLoader(ctx.gridConfig()));
- }
+ val = U.unmarshal(marsh, serialized, clsLdr);
- /** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.GENERIC_VALUE_MESSAGE;
+ serialized = null;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java
index 61fc8977157..68e2d2710bd 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.message;
import java.util.UUID;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
*
@@ -27,5 +28,5 @@ public interface MessageListener {
* @param nodeId Sender node ID.
* @param msg Message.
*/
- void onMessage(UUID nodeId, CalciteMessage msg);
+ void onMessage(UUID nodeId, Message msg);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
index 96b2aee7c2f..caa1de990fe 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
@@ -20,6 +20,7 @@ package
org.apache.ignite.internal.processors.query.calcite.message;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
*
@@ -31,7 +32,7 @@ public interface MessageService extends Service {
* @param nodeId Node ID.
* @param msg Message.
*/
- void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
+ void send(UUID nodeId, Message msg) throws IgniteCheckedException;
/**
* Checks whether a node with given ID is alive.
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
index 2cea51aa5cf..dd421bf1f6c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
@@ -17,7 +17,8 @@
package org.apache.ignite.internal.processors.query.calcite.message;
-import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -49,6 +50,9 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
/** */
private final GridCacheSharedContext<?, ?> ctx;
+ /** */
+ private final ClassLoader clsLdr;
+
/** */
private UUID localNodeId;
@@ -62,14 +66,15 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
private FailureProcessor failureProcessor;
/** */
- private EnumMap<MessageType, MessageListener> lsnrs;
+ private Map<Short, MessageListener> lsnrs;
/** */
public MessageServiceImpl(GridKernalContext ctx) {
super(ctx);
this.ctx = ctx.cache().context();
- this.ioManager = ctx.io();
+ clsLdr = U.resolveClassLoader(ctx.config());
+ ioManager = ctx.io();
msgLsnr = this::onMessage;
}
@@ -146,7 +151,7 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
}
/** {@inheritDoc} */
- @Override public void send(UUID nodeId, CalciteMessage msg) throws
IgniteCheckedException {
+ @Override public void send(UUID nodeId, Message msg) throws
IgniteCheckedException {
if (localNodeId().equals(nodeId))
onMessage(nodeId, msg);
else {
@@ -159,9 +164,9 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
/** {@inheritDoc} */
@Override public void register(MessageListener lsnr, MessageType type) {
if (lsnrs == null)
- lsnrs = new EnumMap<>(MessageType.class);
+ lsnrs = new HashMap<>();
- MessageListener old = lsnrs.put(type, lsnr);
+ MessageListener old = lsnrs.put(type.directType(), lsnr);
assert old == null : old;
}
@@ -179,8 +184,8 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
/** */
protected void prepareMarshal(Message msg) throws IgniteCheckedException {
try {
- if (msg instanceof CalciteMarshalableMessage)
- ((CalciteMarshalableMessage)msg).prepareMarshal(ctx);
+ if (msg instanceof CalciteContextMarshallableMessage)
+ ((CalciteContextMarshallableMessage)msg).prepareMarshal(ctx);
}
catch (Exception e) {
failureProcessor().process(new
FailureContext(FailureType.CRITICAL_ERROR, e));
@@ -192,8 +197,8 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
/** */
protected void prepareUnmarshal(Message msg) throws IgniteCheckedException
{
try {
- if (msg instanceof CalciteMarshalableMessage)
- ((CalciteMarshalableMessage)msg).prepareUnmarshal(ctx);
+ if (msg instanceof CalciteContextMarshallableMessage)
+ ((CalciteContextMarshallableMessage)msg).finishUnmarshal(ctx,
clsLdr);
}
catch (Exception e) {
failureProcessor().process(new
FailureContext(FailureType.CRITICAL_ERROR, e));
@@ -203,7 +208,7 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
}
/** */
- protected void onMessage(UUID nodeId, CalciteMessage msg) {
+ protected void onMessage(UUID nodeId, Message msg) {
if (msg instanceof ExecutionContextAware) {
ExecutionContextAware msg0 = (ExecutionContextAware)msg;
taskExecutor().execute(msg0.queryId(), msg0.fragmentId(), () ->
onMessageInternal(nodeId, msg));
@@ -218,16 +223,16 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
/** */
private void onMessage(UUID nodeId, Object msg, byte plc) {
- if (msg instanceof CalciteMessage)
- onMessage(nodeId, (CalciteMessage)msg);
+ if (msg instanceof Message &&
MessageType.isCalciteMessage((Message)msg))
+ onMessage(nodeId, (Message)msg);
}
/** */
- private void onMessageInternal(UUID nodeId, CalciteMessage msg) {
+ private void onMessageInternal(UUID nodeId, Message msg) {
try {
prepareUnmarshal(msg);
- MessageListener lsnr =
Objects.requireNonNull(lsnrs.get(msg.type()));
+ MessageListener lsnr =
Objects.requireNonNull(lsnrs.get(msg.directType()));
lsnr.onMessage(nodeId, msg);
}
catch (IgniteCheckedException e) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index d0ce95c54da..045dac80f69 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -17,93 +17,82 @@
package org.apache.ignite.internal.processors.query.calcite.message;
-import java.util.function.Supplier;
import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroupSerializer;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescriptionSerializer;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingSerializer;
-import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import org.apache.ignite.plugin.extensions.communication.Message;
-/**
- *
- */
+/** */
public enum MessageType {
/** */
- QUERY_START_REQUEST(300, QueryStartRequest::new, new
QueryStartRequestSerializer()),
+ QUERY_START_REQUEST(300, QueryStartRequest.class),
/** */
- QUERY_START_RESPONSE(301, QueryStartResponse::new, new
QueryStartResponseSerializer()),
+ QUERY_START_RESPONSE(301, QueryStartResponse.class),
/** */
- QUERY_ERROR_MESSAGE(302, CalciteErrorMessage::new, new
CalciteErrorMessageSerializer()),
+ QUERY_ERROR_MESSAGE(302, CalciteErrorMessage.class),
/** */
- QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new, new
QueryBatchMessageSerializer()),
+ QUERY_BATCH_MESSAGE(303, QueryBatchMessage.class),
/** */
- QUERY_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage::new, new
QueryBatchAcknowledgeMessageSerializer()),
+ QUERY_BATCH_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage.class),
/** */
- QUERY_INBOX_CANCEL_MESSAGE(305, InboxCloseMessage::new, new
InboxCloseMessageSerializer()),
+ QUERY_INBOX_CANCEL_MESSAGE(305, QueryInboxCloseMessage.class),
/** */
- QUERY_CLOSE_MESSAGE(306, QueryCloseMessage::new, new
QueryCloseMessageSerializer()),
+ QUERY_CLOSE_MESSAGE(306, QueryCloseMessage.class),
/** */
- GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new, new
GenericValueMessageSerializer()),
+ GENERIC_VALUE_MESSAGE(307, GenericValueMessage.class),
/** */
- FRAGMENT_MAPPING(350, FragmentMapping::new, new
FragmentMappingSerializer()),
+ FRAGMENT_MAPPING(350, FragmentMapping.class),
/** */
- COLOCATION_GROUP(351, ColocationGroup::new, new
ColocationGroupSerializer()),
+ COLOCATION_GROUP(351, ColocationGroup.class),
/** */
- FRAGMENT_DESCRIPTION(352, FragmentDescription::new, new
FragmentDescriptionSerializer()),
+ FRAGMENT_DESCRIPTION(352, FragmentDescription.class),
/** */
- QUERY_TX_ENTRY(353, QueryTxEntry::new, new QueryTxEntrySerializer());
+ QUERY_TX_ENTRY(353, QueryTxEntry.class);
/** */
- private final int directType;
+ private final short directType;
/** */
- private final Supplier<CalciteMessage> factory;
-
- /** */
- private final MessageSerializer serializer;
+ private final Class<? extends Message> msgCls;
/**
* @param directType Direct type.
- * @param factory Message factory.
- * @param serializer Message serializer.
*/
- MessageType(int directType, Supplier<CalciteMessage> factory,
MessageSerializer serializer) {
- this.directType = directType;
- this.factory = factory;
- this.serializer = serializer;
+ MessageType(int directType, Class<? extends Message> msgCls) {
+ this.directType = (short)directType;
+ this.msgCls = msgCls;
}
- /**
- * @return Message factory.
- */
- public Supplier<CalciteMessage> factory() {
- return factory;
+ /** */
+ static boolean isCalciteMessage(Message msg) {
+ MessageType[] values = values();
+ short msgType = msg.directType();
+
+ return msgType >= values[0].directType() && msgType <=
values[values.length - 1].directType();
}
/**
* @return Message direct type.
*/
public short directType() {
- return (short)directType;
+ return directType;
}
/**
- * @return Message serializer.
+ * @return Message direct type.
*/
- public MessageSerializer serializer() {
- return serializer;
+ public Class<? extends Message> messageClass() {
+ return msgCls;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchAcknowledgeMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchAcknowledgeMessage.java
index ec7b04e05ad..acf2b31bedc 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchAcknowledgeMessage.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchAcknowledgeMessage.java
@@ -76,9 +76,4 @@ public class QueryBatchAcknowledgeMessage implements
ExecutionContextAware {
public int batchId() {
return batchId;
}
-
- /** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.QUERY_ACKNOWLEDGE_MESSAGE;
- }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
index c21297c3a5c..487baff9fdc 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
@@ -17,17 +17,13 @@
package org.apache.ignite.internal.processors.query.calcite.message;
-import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-/**
- *
- */
-public class QueryBatchMessage implements CalciteMarshalableMessage,
ExecutionContextAware {
+/** */
+public class QueryBatchMessage implements ExecutionContextAware {
/** */
@Order(0)
UUID qryId;
@@ -48,15 +44,13 @@ public class QueryBatchMessage implements
CalciteMarshalableMessage, ExecutionCo
@Order(4)
boolean last;
- /** */
- private List<Object> rows;
-
/** */
@Order(5)
- List<ValueMessage> mRows;
+ List<GenericValueMessage> mRows;
/** */
public QueryBatchMessage() {
+ // No-op.
}
/** */
@@ -66,7 +60,8 @@ public class QueryBatchMessage implements
CalciteMarshalableMessage, ExecutionCo
this.exchangeId = exchangeId;
this.batchId = batchId;
this.last = last;
- this.rows = rows;
+
+ mRows = rows.stream().map(o -> o == null ? null : new
GenericValueMessage(o)).collect(Collectors.toList());
}
/** {@inheritDoc} */
@@ -104,45 +99,6 @@ public class QueryBatchMessage implements
CalciteMarshalableMessage, ExecutionCo
* @return Rows.
*/
public List<Object> rows() {
- return rows;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- if (mRows != null || rows == null)
- return;
-
- mRows = new ArrayList<>(rows.size());
-
- for (Object row : rows) {
- ValueMessage mRow = CalciteMessageFactory.asMessage(row);
-
- assert mRow != null;
-
- mRow.prepareMarshal(ctx);
-
- mRows.add(mRow);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- if (rows != null || mRows == null)
- return;
-
- rows = new ArrayList<>(mRows.size());
-
- for (ValueMessage mRow : mRows) {
- assert mRow != null;
-
- mRow.prepareUnmarshal(ctx);
-
- rows.add(mRow.value());
- }
- }
-
- /** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.QUERY_BATCH_MESSAGE;
+ return
mRows.stream().map(GenericValueMessage::value).collect(Collectors.toList());
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
index 6db451d285c..4972314aa0a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java
@@ -19,11 +19,12 @@ package
org.apache.ignite.internal.processors.query.calcite.message;
import java.util.UUID;
import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
*
*/
-public class QueryCloseMessage implements CalciteMessage {
+public class QueryCloseMessage implements Message {
/** */
@Order(0)
UUID qryId;
@@ -44,9 +45,4 @@ public class QueryCloseMessage implements CalciteMessage {
public UUID queryId() {
return qryId;
}
-
- /** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.QUERY_CLOSE_MESSAGE;
- }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryInboxCloseMessage.java
similarity index 83%
rename from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryInboxCloseMessage.java
index 19638738da0..532c024d657 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryInboxCloseMessage.java
@@ -19,11 +19,12 @@ package
org.apache.ignite.internal.processors.query.calcite.message;
import java.util.UUID;
import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
*
*/
-public class InboxCloseMessage implements CalciteMessage {
+public class QueryInboxCloseMessage implements Message {
/** */
@Order(0)
UUID qryId;
@@ -37,12 +38,12 @@ public class InboxCloseMessage implements CalciteMessage {
long exchangeId;
/** */
- public InboxCloseMessage() {
+ public QueryInboxCloseMessage() {
// No-op.
}
/** */
- public InboxCloseMessage(UUID qryId, long fragmentId, long exchangeId) {
+ public QueryInboxCloseMessage(UUID qryId, long fragmentId, long
exchangeId) {
this.qryId = qryId;
this.fragmentId = fragmentId;
this.exchangeId = exchangeId;
@@ -68,9 +69,4 @@ public class InboxCloseMessage implements CalciteMessage {
public long exchangeId() {
return exchangeId;
}
-
- /** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.QUERY_INBOX_CANCEL_MESSAGE;
- }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
index bc0817f2473..15d34e00fbb 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
@@ -21,17 +21,19 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
import org.jetbrains.annotations.Nullable;
/**
*
*/
-public class QueryStartRequest implements CalciteMarshalableMessage,
ExecutionContextAware {
+public class QueryStartRequest implements MarshallableMessage,
CalciteContextMarshallableMessage, ExecutionContextAware {
/** */
@Order(0)
String schema;
@@ -116,7 +118,9 @@ public class QueryStartRequest implements
CalciteMarshalableMessage, ExecutionCo
}
/** */
- QueryStartRequest() {}
+ public QueryStartRequest() {
+ // No-op.
+ }
/**
* @return Schema name.
@@ -211,12 +215,13 @@ public class QueryStartRequest implements
CalciteMarshalableMessage, ExecutionCo
}
/** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
if (paramsBytes == null && params != null)
- paramsBytes = U.marshal(ctx, params);
-
- fragmentDesc.prepareMarshal(ctx);
+ paramsBytes = U.marshal(marsh, params);
+ }
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
if (qryTxEntries != null) {
for (QueryTxEntry e : qryTxEntries)
e.prepareMarshal(ctx);
@@ -224,22 +229,18 @@ public class QueryStartRequest implements
CalciteMarshalableMessage, ExecutionCo
}
/** {@inheritDoc} */
- @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- ClassLoader ldr = U.resolveClassLoader(ctx.gridConfig());
-
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
if (params == null && paramsBytes != null)
- params = U.unmarshal(ctx, paramsBytes, ldr);
+ params = U.unmarshal(marsh, paramsBytes, clsLdr);
- fragmentDesc.prepareUnmarshal(ctx);
+ paramsBytes = null;
+ }
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx,
ClassLoader clsLdr) throws IgniteCheckedException {
if (qryTxEntries != null) {
for (QueryTxEntry e : qryTxEntries)
- e.prepareUnmarshal(ctx, ldr);
+ e.finishUnmarshal(ctx, clsLdr);
}
}
-
- /** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.QUERY_START_REQUEST;
- }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
index f908cb61383..afc9d7c7812 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
@@ -20,12 +20,13 @@ package
org.apache.ignite.internal.processors.query.calcite.message;
import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.communication.ErrorMessage;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
*
*/
-public class QueryStartResponse implements CalciteMessage {
+public class QueryStartResponse implements Message {
/** */
@Order(0)
UUID qryId;
@@ -70,9 +71,4 @@ public class QueryStartResponse implements CalciteMessage {
public @Nullable Throwable error() {
return ErrorMessage.error(errMsg);
}
-
- /** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.QUERY_START_RESPONSE;
- }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
index 7961f589d83..36f2d9dceee 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
@@ -38,7 +38,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext
* @see ExecutionContext#transactionChanges(int, int[], Function, Comparator)
* @see QueryStartRequest#queryTransactionEntries()
*/
-public class QueryTxEntry implements CalciteMessage {
+public class QueryTxEntry implements CalciteContextMarshallableMessage {
/** Cache id. */
@Order(0)
int cacheId;
@@ -95,8 +95,8 @@ public class QueryTxEntry implements CalciteMessage {
return ver;
}
- /** */
- public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws
IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
CacheObjectContext coctx =
ctx.cacheContext(cacheId).cacheObjectContext();
key.prepareMarshal(coctx);
@@ -105,8 +105,8 @@ public class QueryTxEntry implements CalciteMessage {
val.prepareMarshal(coctx);
}
- /** */
- public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx, ClassLoader
ldr) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx,
ClassLoader ldr) throws IgniteCheckedException {
CacheObjectContext coctx =
ctx.cacheContext(cacheId).cacheObjectContext();
key.finishUnmarshal(coctx, ldr);
@@ -114,9 +114,4 @@ public class QueryTxEntry implements CalciteMessage {
if (val != null)
val.finishUnmarshal(coctx, ldr);
}
-
- /** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.QUERY_TX_ENTRY;
- }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java
deleted file mode 100644
index 901bcbfafa4..00000000000
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.message;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-
-/** */
-public interface ValueMessage extends CalciteMarshalableMessage {
- /**
- * @return Wrapped value.
- */
- Object value();
-
- /** {@inheritDoc} */
- @Override default void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- // No-op
- }
-
- /** {@inheritDoc} */
- @Override default void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- // No-op
- }
-}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index 3e4f8001589..7981d736953 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -30,19 +30,18 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import
org.apache.ignite.internal.processors.query.calcite.message.CalciteMarshalableMessage;
-import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.GridIntIterator;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
/** */
-public class ColocationGroup implements CalciteMarshalableMessage {
+public class ColocationGroup implements MarshallableMessage {
/** */
@Order(0)
long[] srcIds;
@@ -102,6 +101,7 @@ public class ColocationGroup implements
CalciteMarshalableMessage {
/** */
public ColocationGroup() {
+ // No-op.
}
/** */
@@ -314,12 +314,7 @@ public class ColocationGroup implements
CalciteMarshalableMessage {
}
/** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.COLOCATION_GROUP;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
if (assignments == null || primaryAssignment)
return;
@@ -348,7 +343,7 @@ public class ColocationGroup implements
CalciteMarshalableMessage {
}
/** {@inheritDoc} */
- @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
if (F.isEmpty(marshalledAssignments))
return;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
index 009a9c6c9e3..7560be273c1 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
@@ -21,13 +21,12 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import
org.apache.ignite.internal.processors.query.calcite.message.CalciteMarshalableMessage;
-import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
+import org.apache.ignite.marshaller.Marshaller;
/** */
-public class FragmentDescription implements CalciteMarshalableMessage {
+public class FragmentDescription implements MarshallableMessage {
/** */
@Order(0)
long fragmentId;
@@ -46,6 +45,7 @@ public class FragmentDescription implements
CalciteMarshalableMessage {
/** */
public FragmentDescription() {
+ // No-op.
}
/** */
@@ -97,27 +97,14 @@ public class FragmentDescription implements
CalciteMarshalableMessage {
this.mapping = mapping;
}
- /** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.FRAGMENT_DESCRIPTION;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- if (target != null) {
+ /** */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ if (target != null)
target = target.explicitMapping();
-
- target.prepareMarshal(ctx);
- }
-
- if (mapping != null)
- mapping.prepareMarshal(ctx);
}
- /** {@inheritDoc} */
- @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- target.prepareUnmarshal(ctx);
-
- mapping.prepareUnmarshal(ctx);
+ /** */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
+ // No-op.
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
index 910b990476b..db8368e351d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
@@ -23,26 +23,24 @@ import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import
org.apache.ignite.internal.processors.query.calcite.message.CalciteMarshalableMessage;
-import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.NotNull;
/**
*
*/
-public class FragmentMapping implements CalciteMarshalableMessage {
+public class FragmentMapping implements Message {
/** */
@Order(0)
List<ColocationGroup> colocationGrps;
/** */
public FragmentMapping() {
+ // No-op.
}
/** */
@@ -174,21 +172,4 @@ public class FragmentMapping implements
CalciteMarshalableMessage {
return new FragmentMapping(Commons.transform(colocationGrps,
g -> explicitMappingGrps.contains(g) ? g.explicitMapping() : g));
}
-
- /** {@inheritDoc} */
- @Override public MessageType type() {
- return MessageType.FRAGMENT_MAPPING;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- for (ColocationGroup grp : colocationGrps)
- grp.prepareMarshal(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- for (ColocationGroup grp : colocationGrps)
- grp.prepareUnmarshal(ctx);
- }
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index 6cf79d4f5df..e67cd18885e 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -55,7 +55,6 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlocki
import
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
-import
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
import
org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
@@ -389,7 +388,7 @@ public class AbstractExecutionTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void send(UUID nodeId, CalciteMessage msg) {
+ @Override public void send(UUID nodeId, Message msg) {
mgr.send(localNodeId(), nodeId, msg);
}
@@ -397,16 +396,6 @@ public class AbstractExecutionTest extends
GridCommonAbstractTest {
@Override public boolean alive(UUID nodeId) {
return true;
}
-
- /** {@inheritDoc} */
- @Override protected void prepareMarshal(Message msg) {
- // No-op;
- }
-
- /** {@inheritDoc} */
- @Override protected void prepareUnmarshal(Message msg) {
- // No-op;
- }
}
/**
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
index efd4bffc4ec..af835b4f0ad 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java
@@ -27,11 +27,14 @@ import java.util.UUID;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.type.RelDataType;
+import
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
+import
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessageFactory;
import org.apache.ignite.internal.processors.query.calcite.trait.AllNodes;
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized;
@@ -94,6 +97,9 @@ public class ContinuousExecutionTest extends
AbstractExecutionTest {
@Override public void setup() throws Exception {
nodesCnt = remoteFragmentsCnt + 1;
super.setup();
+
+ // Register messages in Message#REGISTRATIONS and avoids failure in
Message#directType().
+ new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new
CalciteMessageFactory()});
}
/** */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/TestIoManager.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/TestIoManager.java
index 538b1f6426a..eaef5d2afe6 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/TestIoManager.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/TestIoManager.java
@@ -20,6 +20,7 @@ package
org.apache.ignite.internal.processors.query.calcite.message;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
*
@@ -29,7 +30,7 @@ public class TestIoManager {
private final Map<UUID, MessageServiceImpl> srvcMap = new
ConcurrentHashMap<>();
/** */
- public void send(UUID senderId, UUID nodeId, CalciteMessage msg) {
+ public void send(UUID senderId, UUID nodeId, Message msg) {
srvcMap.get(nodeId).onMessage(senderId, msg);
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index b0310c799ce..e91e894e7c2 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -53,7 +53,6 @@ import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
-import
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
import
org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
@@ -740,7 +739,7 @@ public abstract class AbstractPlannerTest extends
GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override public void send(UUID nodeId, CalciteMessage msg) {
+ @Override public void send(UUID nodeId, Message msg) {
mgr.send(localNodeId(), nodeId, msg);
}
@@ -748,16 +747,6 @@ public abstract class AbstractPlannerTest extends
GridCommonAbstractTest {
@Override public boolean alive(UUID nodeId) {
return true;
}
-
- /** {@inheritDoc} */
- @Override protected void prepareMarshal(Message msg) {
- // No-op;
- }
-
- /** {@inheritDoc} */
- @Override protected void prepareUnmarshal(Message msg) {
- // No-op;
- }
}
/** */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java
index 11483e74181..75530ae0304 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCheckedException;
+import
org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.query.calcite.QueryRegistryImpl;
@@ -40,6 +41,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
+import
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessageFactory;
import
org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
import
org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
@@ -63,6 +65,7 @@ import
org.apache.ignite.internal.processors.query.calcite.util.Commons;
import
org.apache.ignite.internal.processors.security.NoOpIgniteSecurityProcessor;
import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor;
import org.apache.ignite.internal.util.typedef.F;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.junit.Assert;
import org.junit.Test;
@@ -75,6 +78,14 @@ import static
org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KE
*/
@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
public class PlanExecutionTest extends AbstractPlannerTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ // Register messages in Message#REGISTRATIONS and avoids failure in
Message#directType().
+ new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new
CalciteMessageFactory()});
+ }
+
/**
* @throws Exception If failed.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index cb076954ad8..aa5d1e210cb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -17,8 +17,7 @@
package org.apache.ignite.internal;
-import java.lang.reflect.Constructor;
-import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
@@ -40,6 +39,7 @@ import
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyReque
import
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest;
import
org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
+import
org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
import org.apache.ignite.internal.processors.authentication.User;
import
org.apache.ignite.internal.processors.authentication.UserAcceptedMessage;
import
org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage;
@@ -230,14 +230,10 @@ import
org.apache.ignite.internal.util.GridPartitionStateMap;
import org.apache.ignite.internal.util.distributed.FullMessage;
import org.apache.ignite.internal.util.distributed.InitMessage;
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
import
org.apache.ignite.spi.communication.tcp.internal.TcpConnectionRequestDiscoveryMessage;
import
org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage;
@@ -278,7 +274,7 @@ import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessa
import org.jetbrains.annotations.Nullable;
/** */
-public class CoreMessagesProvider implements MessageFactoryProvider {
+public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProvider {
/** Node ID message type. */
public static final short NODE_ID_MSG_TYPE = 11500;
@@ -291,15 +287,6 @@ public class CoreMessagesProvider implements
MessageFactoryProvider {
/** */
public static final short MAX_MESSAGE_ID = 15_000;
- /** Binary marshaller. */
- private final Marshaller schemaAwareMarhaller;
-
- /** Binary marshaller. */
- private final Marshaller schemaLessMarshaller;
-
- /** Resolved classloader. */
- private final ClassLoader resolvedClsLdr;
-
/** */
private short msgIdx;
@@ -307,14 +294,22 @@ public class CoreMessagesProvider implements
MessageFactoryProvider {
private @Nullable MessageFactory factory;
/**
- * @param schemaAwareMarhaller Schema-aware marshaller like {@link
BinaryMarshaller}.
- * @param schemaLessMarshaller Pure, schemaless marshaller like {@link
JdkMarshaller}.
- * @param resolvedClsLdr Resolved classloader.
+ * Default plugin-purposes constructor.
+ *
+ * @see #init(Marshaller, ClassLoader)
*/
- public CoreMessagesProvider(Marshaller schemaAwareMarhaller, Marshaller
schemaLessMarshaller, ClassLoader resolvedClsLdr) {
- this.schemaAwareMarhaller = schemaAwareMarhaller;
- this.schemaLessMarshaller = schemaLessMarshaller;
- this.resolvedClsLdr = resolvedClsLdr;
+ public CoreMessagesProvider() {
+ // No-op.
+ }
+
+ /**
+ * Constructor allowing not to call {@link #init(Marshaller, ClassLoader)}.
+ *
+ * @param schemaAwareMarsh Schema-aware marshaller like {@link
BinaryMarshaller}.
+ * @param resolvedClsLdr Resolved (configured) class loader like {@link
IgniteConfiguration#setClassLoader(ClassLoader)}.
+ */
+ public CoreMessagesProvider(Marshaller schemaAwareMarsh, ClassLoader
resolvedClsLdr) {
+ init(schemaAwareMarsh, resolvedClsLdr);
}
/** The order is important. If wish to remove a message, put 'msgIdx++' on
its place. */
@@ -644,52 +639,23 @@ public class CoreMessagesProvider implements
MessageFactoryProvider {
assert msgIdx <= MAX_MESSAGE_ID;
}
- /** Registers message using {@link #schemaAwareMarhaller} and {@link
U#gridClassLoader()}. */
- private <T extends Message> void withSchema(Class<T> cls) {
- register(cls, schemaAwareMarhaller, U.gridClassLoader());
+ /** Registers message using {@link #dfltMarsh} and {@link #dftlClsLdr}. */
+ private <T extends Message> void withNoSchema(Class<T> cls) {
+ register(cls, dfltMarsh, dftlClsLdr);
}
- /** Registers message using {@link #schemaLessMarshaller} and {@link
U#gridClassLoader()}. */
- private <T extends Message> void withNoSchema(Class<T> cls) {
- register(cls, schemaLessMarshaller, U.gridClassLoader());
+ /** Registers message using {@link #schemaAwareMarsh} and {@link
#dftlClsLdr}. */
+ private <T extends Message> void withSchema(Class<T> cls) {
+ register(cls, schemaAwareMarsh, dftlClsLdr);
}
- /** Registers message using {@link #schemaLessMarshaller} and {@link
#resolvedClsLdr}. */
+ /** Registers message using {@link #schemaAwareMarsh} and {@link
#resolvedClsLdr}. */
private <T extends Message> void withNoSchemaResolvedClassLoader(Class<T>
cls) {
- register(cls, schemaLessMarshaller, resolvedClsLdr);
+ register(cls, dfltMarsh, resolvedClsLdr);
}
/** Registers message using incrementing {@link #msgIdx} as the message
id/type. */
private <T extends Message> void register(Class<T> cls, Marshaller marsh,
ClassLoader clsLrd) {
- Constructor<T> ctor;
- MessageSerializer<T> serializer;
-
- try {
- ctor = cls.getConstructor();
-
- boolean marshallable =
MarshallableMessage.class.isAssignableFrom(cls);
-
- Class<?> serCls = Class.forName(cls.getName() + (marshallable ?
"MarshallableSerializer" : "Serializer"));
-
- serializer = marshallable
- ?
(MessageSerializer<T>)serCls.getConstructor(Marshaller.class,
ClassLoader.class).newInstance(marsh, clsLrd)
- : (MessageSerializer<T>)serCls.getConstructor().newInstance();
- }
- catch (Exception e) {
- throw new IgniteException("Failed to register message of type " +
cls.getSimpleName(), e);
- }
-
- factory.register(
- msgIdx++,
- () -> {
- try {
- return ctor.newInstance();
- }
- catch (Exception e) {
- throw new IgniteException("Failed to create message of
type " + cls.getSimpleName(), e);
- }
- },
- serializer
- );
+ register(factory, cls, msgIdx++, marsh, clsLrd);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 315130fe561..a27c0623eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -112,6 +112,7 @@ import
org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
import
org.apache.ignite.internal.managers.systemview.IgniteConfigurationIterable;
import org.apache.ignite.internal.managers.tracing.GridTracingManager;
+import
org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
import org.apache.ignite.internal.plugin.IgniteLogInfoProvider;
import org.apache.ignite.internal.plugin.IgniteLogInfoProviderImpl;
import org.apache.ignite.internal.processors.GridProcessor;
@@ -1098,7 +1099,7 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
startProcessor(new GridTaskProcessor(ctx));
startProcessor((GridProcessor)SCHEDULE.createOptional(ctx));
startProcessor(createComponent(IgniteRestProcessor.class,
ctx));
- startProcessor(new DataStreamProcessor(ctx));
+ startProcessor(new DataStreamProcessor<>(ctx));
startProcessor(new GridContinuousProcessor(ctx));
startProcessor(new DataStructuresProcessor(ctx));
startProcessor(createComponent(PlatformProcessor.class, ctx));
@@ -1114,7 +1115,7 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
startTimer.finishGlobalStage("Start processors");
// Start plugins.
- for (PluginProvider provider : ctx.plugins().allProviders()) {
+ for (PluginProvider<?> provider :
ctx.plugins().allProviders()) {
ctx.add(new GridPluginComponent(provider));
provider.start(ctx.plugins().pluginContextForProvider(provider));
@@ -1255,7 +1256,7 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
}
// Start plugins.
- for (PluginProvider provider : ctx.plugins().allProviders())
+ for (PluginProvider<?> provider : ctx.plugins().allProviders())
provider.onIgniteStart();
if (recon)
@@ -1319,14 +1320,17 @@ public class IgniteKernal implements IgniteEx,
Externalizable {
List<MessageFactoryProvider> compMsgs = new ArrayList<>();
- compMsgs.add(new CoreMessagesProvider(ctx.marshaller(),
ctx.marshallerContext().jdkMarshaller(),
- U.resolveClassLoader(ctx.config())));
+ compMsgs.add(new CoreMessagesProvider(ctx.marshaller(),
U.resolveClassLoader(ctx.config())));
for (IgniteComponentType compType : IgniteComponentType.values()) {
MessageFactoryProvider f = compType.messageFactory();
- if (f != null)
+ if (f != null) {
+ if (f instanceof AbstractMarshallableMessageFactoryProvider)
+
((AbstractMarshallableMessageFactoryProvider)f).init(ctx.marshaller(),
U.resolveClassLoader(ctx.config()));
+
compMsgs.add(f);
+ }
}
DiscoverySpi discoSpi = ctx.config().getDiscoverySpi();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index a9d0b451458..ef79d006969 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -346,9 +346,6 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_TOPICS,
MAX_CLOSED_TOPICS, 0.75f, 256,
PER_SEGMENT_Q_OPTIMIZED_RMV);
- /** */
- private MessageFactory msgFactory;
-
/** */
private MessageFormatter formatter;
@@ -393,9 +390,9 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
* @return Message factory.
*/
public MessageFactory messageFactory() {
- assert msgFactory != null;
+ assert ctx.messageFactory() != null;
- return msgFactory;
+ return ctx.messageFactory();
}
/**
@@ -437,8 +434,6 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
};
}
- msgFactory = ctx.messageFactory();
-
CommunicationSpi<Object> spi = getSpi();
if ((CommunicationSpi<?>)spi instanceof TcpCommunicationSpi)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/plugin/AbstractMarshallableMessageFactoryProvider.java
b/modules/core/src/main/java/org/apache/ignite/internal/plugin/AbstractMarshallableMessageFactoryProvider.java
new file mode 100644
index 00000000000..03a65b95b6e
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/plugin/AbstractMarshallableMessageFactoryProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.plugin;
+
+import java.lang.reflect.Constructor;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.MarshallableMessage;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.Marshallers;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+
+/**
+ * An extension of {@link MessageFactoryProvider} allowing to use provided
schema-aware marshaller
+ * and resolved class loader to register {@link MarshallableMessage}.
+ */
+public abstract class AbstractMarshallableMessageFactoryProvider implements
MessageFactoryProvider {
+ /** Default schema-less marshaller. */
+ protected final Marshaller dfltMarsh = Marshallers.jdk();
+
+ /** Default class loader. */
+ protected final ClassLoader dftlClsLdr = U.gridClassLoader();
+
+ /** Schema-aware marshaller like {@link BinaryMarshaller}. */
+ protected Marshaller schemaAwareMarsh;
+
+ /** Resolved (configured) class loader like {@link
IgniteConfiguration#setClassLoader(ClassLoader)}. */
+ protected ClassLoader resolvedClsLdr;
+
+ /**
+ * @param schemaAwareMarsh Schema-aware marshaller like {@link
BinaryMarshaller}.
+ * @param resolvedClsLdr Resolved (configured) class loader like {@link
IgniteConfiguration#setClassLoader(ClassLoader)}.
+ */
+ public void init(Marshaller schemaAwareMarsh, ClassLoader resolvedClsLdr) {
+ this.schemaAwareMarsh = schemaAwareMarsh;
+ this.resolvedClsLdr = resolvedClsLdr;
+ }
+
+ /** Registers message automatically generating message supplier and
serializer. */
+ protected static <T extends Message> void register(MessageFactory factory,
Class<T> cls, short id, Marshaller marsh,
+ ClassLoader clsLrd) {
+ Constructor<T> ctor;
+ MessageSerializer<T> serializer;
+
+ try {
+ ctor = cls.getConstructor();
+
+ boolean marshallable =
MarshallableMessage.class.isAssignableFrom(cls);
+
+ Class<?> serCls = Class.forName(cls.getName() + (marshallable ?
"MarshallableSerializer" : "Serializer"));
+
+ serializer = marshallable
+ ?
(MessageSerializer<T>)serCls.getConstructor(Marshaller.class,
ClassLoader.class).newInstance(marsh, clsLrd)
+ : (MessageSerializer<T>)serCls.getConstructor().newInstance();
+ }
+ catch (Exception e) {
+ throw new IgniteException("Failed to register message of type " +
cls.getSimpleName(), e);
+ }
+
+ factory.register(
+ id,
+ () -> {
+ try {
+ return ctor.newInstance();
+ }
+ catch (Exception e) {
+ throw new IgniteException("Failed to create message of
type " + cls.getSimpleName(), e);
+ }
+ },
+ serializer
+ );
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
index ea37b6abe45..a4c91643889 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
@@ -46,7 +46,7 @@ public class DirectMarshallingMessagesTest extends
GridCommonAbstractTest {
/** Message factory. */
private final MessageFactory msgFactory =
new IgniteMessageFactoryImpl(new MessageFactoryProvider[] {
- new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()),
+ new CoreMessagesProvider(jdk(), U.gridClassLoader()),
factory -> factory.register(
TestNestedContainersMessage.TYPE,
TestNestedContainersMessage::new,
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteCoreMessagesSerializationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteCoreMessagesSerializationTest.java
index 21f22330814..74f1c90b998 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteCoreMessagesSerializationTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteCoreMessagesSerializationTest.java
@@ -34,7 +34,7 @@ import static org.apache.ignite.marshaller.Marshallers.jdk;
public class IgniteCoreMessagesSerializationTest extends
AbstractMessageSerializationTest {
/** {@inheritDoc} */
@Override protected MessageFactoryProvider messageFactory() {
- return new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader());
+ return new CoreMessagesProvider(jdk(), U.gridClassLoader());
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
index 53fa342460b..f124eb98228 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
@@ -46,7 +46,7 @@ public class CompressedMessageTest {
@Test
public void testWriteReadHugeMessage() {
MessageFactory msgFactory = new IgniteMessageFactoryImpl(new
MessageFactoryProvider[]{
- new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader())});
+ new CoreMessagesProvider(jdk(), U.gridClassLoader())});
DirectMessageWriter writer = new DirectMessageWriter(msgFactory);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index a41490c0e40..15ddd54040c 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -150,7 +150,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest
extends GridCommonAbst
e0.markFiltered();
IgniteMessageFactoryImpl msgFactory =
- new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new
CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader())});
+ new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new
CoreMessagesProvider(jdk(), U.gridClassLoader())});
ByteBuffer buf = ByteBuffer.allocate(4096);
DirectMessageWriter writer = new DirectMessageWriter(msgFactory);
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 1ca7217b67d..f82c844bda9 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -154,7 +154,7 @@ public abstract class GridAbstractCommunicationSelfTest<T
extends CommunicationS
GridSpiTestContext ctx = initSpiContext();
ctx.messageFactory(new IgniteMessageFactoryImpl(new
MessageFactoryProvider[] {
- new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()),
customMessageFactory()}));
+ new CoreMessagesProvider(jdk(), U.gridClassLoader()),
customMessageFactory()}));
ctx.setLocalNode(node);
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 997d5b59fd4..1bc2358cc7e 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -434,7 +434,7 @@ public class
GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
GridSpiTestContext ctx = initSpiContext();
ctx.messageFactory(new IgniteMessageFactoryImpl(
- new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(),
jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
+ new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(),
U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
);
ctx.setLocalNode(node);
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
index 3f2d90924bf..a604da0edb7 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
@@ -252,7 +252,7 @@ public class GridTcpCommunicationSpiConfigSelfTest extends
GridSpiAbstractConfig
node.setId(rsrcs.getNodeId());
ctx.messageFactory(new IgniteMessageFactoryImpl(new
MessageFactoryProvider[]{
- new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()),
GRID_TEST_MESSAGE_FACTORY}));
+ new CoreMessagesProvider(jdk(), U.gridClassLoader()),
GRID_TEST_MESSAGE_FACTORY}));
ctx.setLocalNode(node);
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index c043c068fc4..6a5d26eb266 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -468,7 +468,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest
extends GridSpiAbstrac
GridSpiTestContext ctx = initSpiContext();
ctx.messageFactory(new IgniteMessageFactoryImpl(
- new MessageFactoryProvider[] {new
CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()),
GRID_TEST_MESSAGE_FACTORY})
+ new MessageFactoryProvider[] {new
CoreMessagesProvider(jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
);
ctx.timeoutProcessor(timeoutProcessor);
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 56c6daceca5..de46f4b674a 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -400,7 +400,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T
extends CommunicationS
GridSpiTestContext ctx = initSpiContext();
ctx.messageFactory(new IgniteMessageFactoryImpl(
- new MessageFactoryProvider[] {new
CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()),
GRID_TEST_MESSAGE_FACTORY})
+ new MessageFactoryProvider[] {new
CoreMessagesProvider(jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
);
ctx.setLocalNode(node);
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index d4f00e425a4..633bd332eaf 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -729,7 +729,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T
extends CommunicationSpi<
GridSpiTestContext ctx = initSpiContext();
ctx.messageFactory(new IgniteMessageFactoryImpl(
- new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(),
jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
+ new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(),
U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
);
ctx.setLocalNode(node);
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index 58a321e21c8..06d657a8a3b 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -453,7 +453,7 @@ public class
IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
GridSpiTestContext ctx = initSpiContext();
ctx.messageFactory(new IgniteMessageFactoryImpl(
- new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(),
jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
+ new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(),
U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})
);
ctx.setLocalNode(node);
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
index 2aaabee1aeb..decf2bf4b5c 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -121,7 +121,7 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi
implements IgniteDiscov
assert !started();
this.msgFactory = new IgniteMessageFactoryImpl(new
MessageFactoryProvider[] {
- new CoreMessagesProvider(jdk(), jdk(),
U.resolveClassLoader(ignite().configuration())),
+ new CoreMessagesProvider(jdk(),
U.resolveClassLoader(ignite().configuration())),
msgFactoryProvider
});
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 4eb5d79e0b6..b5c5035b7d0 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -554,7 +554,7 @@ public class GridSpiTestContext implements IgniteSpiContext
{
/** {@inheritDoc} */
@Override public MessageFactory messageFactory() {
if (factory == null)
- factory = new IgniteMessageFactoryImpl(new
MessageFactoryProvider[]{new CoreMessagesProvider(jdk(), jdk(),
+ factory = new IgniteMessageFactoryImpl(new
MessageFactoryProvider[]{new CoreMessagesProvider(jdk(),
U.gridClassLoader())});
return factory;
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index fe6becffafc..e6638d4e112 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -85,7 +85,6 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.marshaller.Marshallers;
import org.apache.ignite.metric.IgniteMetrics;
import org.apache.ignite.plugin.IgnitePlugin;
import org.apache.ignite.plugin.PluginNotFoundException;
@@ -156,7 +155,7 @@ public class IgniteMock implements IgniteEx {
ClassLoader lrd = staticCfg == null ? U.gridClassLoader() :
U.resolveClassLoader(staticCfg);
msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]
{
- new CoreMessagesProvider(marshaller, Marshallers.jdk(), lrd)});
+ new CoreMessagesProvider(marshaller, lrd)});
try {
kernalCtx = new StandaloneGridKernalContext(new
GridTestLog4jLogger(), null) {