This is an automated email from the ASF dual-hosted git repository.
anton-vinogradov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new fbd3eaed679 IGNITE-28693 Remove Calcite's MessageType enum (#13145)
fbd3eaed679 is described below
commit fbd3eaed679b5b1a0c04c31cd23a8be65f8200a3
Author: Vladimir Steshin <[email protected]>
AuthorDate: Wed May 20 14:39:55 2026 +0300
IGNITE-28693 Remove Calcite's MessageType enum (#13145)
---
.../query/calcite/exec/ExchangeServiceImpl.java | 9 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 11 ++-
.../calcite/message/CalciteMessageFactory.java | 34 ++++++--
.../query/calcite/message/MessageService.java | 2 +-
.../query/calcite/message/MessageServiceImpl.java | 36 ++++----
.../query/calcite/message/MessageType.java | 98 ----------------------
6 files changed, 55 insertions(+), 135 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index ebf5c85c533..6dff007ff79 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -33,7 +33,6 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTr
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
-import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryBatchAcknowledgeMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryCloseMessage;
@@ -188,10 +187,10 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
/** {@inheritDoc} */
@Override public void init() {
- messageService().register((n, m) -> onMessage(n,
(QueryInboxCloseMessage)m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
- messageService().register((n, m) -> onMessage(n,
(QueryBatchAcknowledgeMessage)m), MessageType.QUERY_BATCH_ACKNOWLEDGE_MESSAGE);
- messageService().register((n, m) -> onMessage(n,
(QueryBatchMessage)m), MessageType.QUERY_BATCH_MESSAGE);
- messageService().register((n, m) -> onMessage(n,
(QueryCloseMessage)m), MessageType.QUERY_CLOSE_MESSAGE);
+ messageService().register((n, m) -> onMessage(n,
(QueryInboxCloseMessage)m), QueryInboxCloseMessage.class);
+ messageService().register((n, m) -> onMessage(n,
(QueryBatchAcknowledgeMessage)m), QueryBatchAcknowledgeMessage.class);
+ messageService().register((n, m) -> onMessage(n,
(QueryBatchMessage)m), QueryBatchMessage.class);
+ messageService().register((n, m) -> onMessage(n,
(QueryCloseMessage)m), QueryCloseMessage.class);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 70bc993f8b7..f0207a65a1d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -78,7 +78,6 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.Performa
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
import
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
-import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
import
org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
@@ -478,9 +477,9 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
/** {@inheritDoc} */
@Override public void init() {
- messageService().register((n, m) -> onMessage(n,
(QueryStartRequest)m), MessageType.QUERY_START_REQUEST);
- messageService().register((n, m) -> onMessage(n,
(QueryStartResponse)m), MessageType.QUERY_START_RESPONSE);
- messageService().register((n, m) -> onMessage(n,
(CalciteErrorMessage)m), MessageType.QUERY_ERROR_MESSAGE);
+ messageService().register((n, m) -> onMessage(n,
(QueryStartRequest)m), QueryStartRequest.class);
+ messageService().register((n, m) -> onMessage(n,
(QueryStartResponse)m), QueryStartResponse.class);
+ messageService().register((n, m) -> onMessage(n,
(CalciteErrorMessage)m), CalciteErrorMessage.class);
eventManager().addDiscoveryEventListener(discoLsnr,
EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
@@ -537,7 +536,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
);
case EXPLAIN:
- return executeExplain(qry, (ExplainPlan)plan);
+ return executeExplain((ExplainPlan)plan);
case DDL:
return executeDdl(qry, (DdlPlan)plan);
@@ -883,7 +882,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
}
/** */
- private FieldsQueryCursor<List<?>> executeExplain(RootQuery<Row> qry,
ExplainPlan plan) {
+ private FieldsQueryCursor<List<?>> executeExplain(ExplainPlan plan) {
QueryCursorImpl<List<?>> cur = new
QueryCursorImpl<>(singletonList(singletonList(plan.plan())));
cur.fieldsMeta(plan.fieldsMeta().queryFieldsMetadata(Commons.typeFactory()));
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
index f3a9d6f431d..d15b5821e05 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java
@@ -17,21 +17,41 @@
package org.apache.ignite.internal.processors.query.calcite.message;
-import org.apache.ignite.internal.MarshallableMessage;
import
org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
/**
* Message factory.
*/
public class CalciteMessageFactory extends
AbstractMarshallableMessageFactoryProvider {
+ /** */
+ public static final short MIN_MESSAGE_TYPE = 300;
+
+ /** */
+ public static final short MAX_MESSAGE_TYPE = 311;
+
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
- for (MessageType type : MessageType.values()) {
- if
(MarshallableMessage.class.isAssignableFrom(type.messageClass()))
- register(factory, type.messageClass(), type.directType(),
schemaAwareMarsh, resolvedClsLdr);
- else
- register(factory, type.messageClass(), type.directType(),
dfltMarsh, dftlClsLdr);
- }
+ register(factory, QueryStartRequest.class, (short)300,
schemaAwareMarsh, resolvedClsLdr);
+ register(factory, QueryStartResponse.class, (short)301, dfltMarsh,
dftlClsLdr);
+ register(factory, CalciteErrorMessage.class, (short)302, dfltMarsh,
resolvedClsLdr);
+ register(factory, QueryBatchMessage.class, (short)303, dfltMarsh,
dftlClsLdr);
+ register(factory, QueryBatchAcknowledgeMessage.class, (short)304,
dfltMarsh, dftlClsLdr);
+ register(factory, QueryInboxCloseMessage.class, (short)305, dfltMarsh,
dftlClsLdr);
+ register(factory, QueryCloseMessage.class, (short)306, dfltMarsh,
dftlClsLdr);
+ register(factory, GenericValueMessage.class, (short)307,
schemaAwareMarsh, resolvedClsLdr);
+ register(factory, FragmentMapping.class, (short)308, dfltMarsh,
dftlClsLdr);
+ register(factory, ColocationGroup.class, (short)309, dfltMarsh,
dftlClsLdr);
+ register(factory, FragmentDescription.class, (short)310, dfltMarsh,
dftlClsLdr);
+ register(factory, QueryTxEntry.class, (short)311, dfltMarsh,
dftlClsLdr);
+ }
+
+ /** */
+ public static boolean isCalciteMessage(Message msg) {
+ return msg.directType() >= MIN_MESSAGE_TYPE && msg.directType() <=
MAX_MESSAGE_TYPE;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
index caa1de990fe..aad52061e16 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
@@ -48,5 +48,5 @@ public interface MessageService extends Service {
* @param lsnr Listener.
* @param type Message type.
*/
- void register(MessageListener lsnr, MessageType type);
+ <T extends Message> void register(MessageListener lsnr, Class<T> type);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
index dd421bf1f6c..4c8a699ca9c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
@@ -54,19 +54,19 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
private final ClassLoader clsLdr;
/** */
- private UUID localNodeId;
+ private UUID locNodeId;
/** */
- private final GridIoManager ioManager;
+ private final GridIoManager ioMgr;
/** */
private QueryTaskExecutor taskExecutor;
/** */
- private FailureProcessor failureProcessor;
+ private FailureProcessor failureProc;
/** */
- private Map<Short, MessageListener> lsnrs;
+ private Map<Class<? extends Message>, MessageListener> lsnrs;
/** */
public MessageServiceImpl(GridKernalContext ctx) {
@@ -74,29 +74,29 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
this.ctx = ctx.cache().context();
clsLdr = U.resolveClassLoader(ctx.config());
- ioManager = ctx.io();
+ ioMgr = ctx.io();
msgLsnr = this::onMessage;
}
/**
- * @param localNodeId Local node ID.
+ * @param locNodeId Local node ID.
*/
- public void localNodeId(UUID localNodeId) {
- this.localNodeId = localNodeId;
+ public void localNodeId(UUID locNodeId) {
+ this.locNodeId = locNodeId;
}
/**
* @return Local node ID.
*/
public UUID localNodeId() {
- return localNodeId;
+ return locNodeId;
}
/**
* @return IO manager.
*/
public GridIoManager ioManager() {
- return ioManager;
+ return ioMgr;
}
/**
@@ -114,17 +114,17 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
}
/**
- * @param failureProcessor Failure processor.
+ * @param failureProc Failure processor.
*/
- public void failureProcessor(FailureProcessor failureProcessor) {
- this.failureProcessor = failureProcessor;
+ public void failureProcessor(FailureProcessor failureProc) {
+ this.failureProc = failureProc;
}
/**
* @return Failure processor.
*/
public FailureProcessor failureProcessor() {
- return failureProcessor;
+ return failureProc;
}
/** {@inheritDoc} */
@@ -162,11 +162,11 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
}
/** {@inheritDoc} */
- @Override public void register(MessageListener lsnr, MessageType type) {
+ @Override public <T extends Message> void register(MessageListener lsnr,
Class<T> type) {
if (lsnrs == null)
lsnrs = new HashMap<>();
- MessageListener old = lsnrs.put(type.directType(), lsnr);
+ MessageListener old = lsnrs.put(type, lsnr);
assert old == null : old;
}
@@ -223,7 +223,7 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
/** */
private void onMessage(UUID nodeId, Object msg, byte plc) {
- if (msg instanceof Message &&
MessageType.isCalciteMessage((Message)msg))
+ if (msg instanceof Message &&
CalciteMessageFactory.isCalciteMessage((Message)msg))
onMessage(nodeId, (Message)msg);
}
@@ -232,7 +232,7 @@ public class MessageServiceImpl extends AbstractService
implements MessageServic
try {
prepareUnmarshal(msg);
- MessageListener lsnr =
Objects.requireNonNull(lsnrs.get(msg.directType()));
+ MessageListener lsnr =
Objects.requireNonNull(lsnrs.get(msg.getClass()));
lsnr.onMessage(nodeId, msg);
}
catch (IgniteCheckedException e) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
deleted file mode 100644
index 045dac80f69..00000000000
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.message;
-
-import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
-import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
-import org.apache.ignite.plugin.extensions.communication.Message;
-
-/** */
-public enum MessageType {
- /** */
- QUERY_START_REQUEST(300, QueryStartRequest.class),
-
- /** */
- QUERY_START_RESPONSE(301, QueryStartResponse.class),
-
- /** */
- QUERY_ERROR_MESSAGE(302, CalciteErrorMessage.class),
-
- /** */
- QUERY_BATCH_MESSAGE(303, QueryBatchMessage.class),
-
- /** */
- QUERY_BATCH_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage.class),
-
- /** */
- QUERY_INBOX_CANCEL_MESSAGE(305, QueryInboxCloseMessage.class),
-
- /** */
- QUERY_CLOSE_MESSAGE(306, QueryCloseMessage.class),
-
- /** */
- GENERIC_VALUE_MESSAGE(307, GenericValueMessage.class),
-
- /** */
- FRAGMENT_MAPPING(350, FragmentMapping.class),
-
- /** */
- COLOCATION_GROUP(351, ColocationGroup.class),
-
- /** */
- FRAGMENT_DESCRIPTION(352, FragmentDescription.class),
-
- /** */
- QUERY_TX_ENTRY(353, QueryTxEntry.class);
-
- /** */
- private final short directType;
-
- /** */
- private final Class<? extends Message> msgCls;
-
- /**
- * @param directType Direct type.
- */
- MessageType(int directType, Class<? extends Message> msgCls) {
- this.directType = (short)directType;
- this.msgCls = msgCls;
- }
-
- /** */
- static boolean isCalciteMessage(Message msg) {
- MessageType[] values = values();
- short msgType = msg.directType();
-
- return msgType >= values[0].directType() && msgType <=
values[values.length - 1].directType();
- }
-
- /**
- * @return Message direct type.
- */
- public short directType() {
- return directType;
- }
-
- /**
- * @return Message direct type.
- */
- public Class<? extends Message> messageClass() {
- return msgCls;
- }
-}