This is an automated email from the ASF dual-hosted git repository.
anton-vinogradov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d8ed7788958 IGNITE-28636 Do not check message type by raw numbers.
(#13110)
d8ed7788958 is described below
commit d8ed77889584bf413fab85617e8978829bc422ca
Author: Vladimir Steshin <[email protected]>
AuthorDate: Fri May 8 21:49:56 2026 +0300
IGNITE-28636 Do not check message type by raw numbers. (#13110)
---
.../processors/cache/GridCacheIoManager.java | 473 +++++++++------------
.../cache/transactions/IgniteTxManager.java | 68 ++-
2 files changed, 240 insertions(+), 301 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 5c639d08b25..4dfdb6b3599 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -68,6 +68,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
@@ -747,329 +748,275 @@ public class GridCacheIoManager extends
GridCacheSharedManagerAdapter {
throws IgniteCheckedException {
assert msg != null;
- switch (msg.directType()) {
- case 10022: {
- GridDhtLockRequest req = (GridDhtLockRequest)msg;
+ if (msg instanceof GridDhtLockRequest) {
+ GridDhtLockRequest req = (GridDhtLockRequest)msg;
- GridDhtLockResponse res = new GridDhtLockResponse(
- req.cacheId(),
- req.version(),
- req.futureId(),
- req.miniId(),
- 0);
+ GridDhtLockResponse res = new GridDhtLockResponse(
+ req.cacheId(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ 0);
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
- }
-
- break;
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
+ }
+ else if (msg instanceof GridDhtTxPrepareRequest) {
+ GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg;
- case 10016: {
- GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg;
+ GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.deployInfo() != null);
- GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(
- req.partition(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.deployInfo() != null);
+ res.error(req.classError());
- res.error(req.classError());
+ sendResponseOnFailedMessage(nodeId, res, cctx, req.policy());
+ }
+ else if (msg instanceof GridDhtAtomicUpdateRequest) {
+ GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg;
- sendResponseOnFailedMessage(nodeId, res, cctx, req.policy());
- }
+ GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
+ req.cacheId(),
+ req.partition(),
+ req.futureId());
- break;
+ res.onError(req.classError());
- case 10303: {
- GridDhtAtomicUpdateRequest req =
(GridDhtAtomicUpdateRequest)msg;
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
- GridDhtAtomicUpdateResponse res = new
GridDhtAtomicUpdateResponse(
- req.cacheId(),
+ if (req.nearNodeId() != null) {
+ GridDhtAtomicNearResponse nearRes = new
GridDhtAtomicNearResponse(req.cacheId(),
req.partition(),
- req.futureId());
-
- res.onError(req.classError());
-
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
-
- if (req.nearNodeId() != null) {
- GridDhtAtomicNearResponse nearRes = new
GridDhtAtomicNearResponse(req.cacheId(),
- req.partition(),
- req.nearFutureId(),
- nodeId,
- req.flags());
-
- nearRes.errors(new UpdateErrors(req.classError()));
-
- sendResponseOnFailedMessage(req.nearNodeId(), nearRes,
cctx, plc);
- }
- }
-
- break;
-
- case 10305: {
- GridNearAtomicFullUpdateRequest req =
(GridNearAtomicFullUpdateRequest)msg;
-
- GridNearAtomicUpdateResponse res = new
GridNearAtomicUpdateResponse(
- req.cacheId(),
+ req.nearFutureId(),
nodeId,
- req.futureId(),
- req.partition(),
- false);
-
- res.error(req.classError());
-
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
- }
-
- break;
+ req.flags());
- case 10300: {
- GridDhtForceKeysRequest req = (GridDhtForceKeysRequest)msg;
-
- GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
- req.cacheId(),
- req.futureId(),
- req.miniId(),
- req.classError()
- );
+ nearRes.errors(new UpdateErrors(req.classError()));
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
+ sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx,
plc);
}
+ }
+ else if (msg instanceof GridNearAtomicFullUpdateRequest) {
+ GridNearAtomicFullUpdateRequest req =
(GridNearAtomicFullUpdateRequest)msg;
- break;
+ GridNearAtomicUpdateResponse res = new
GridNearAtomicUpdateResponse(
+ req.cacheId(),
+ nodeId,
+ req.futureId(),
+ req.partition(),
+ false);
- case 10313: {
- GridNearGetRequest req = (GridNearGetRequest)msg;
+ res.error(req.classError());
- GridNearGetResponse res = new GridNearGetResponse(
- req.cacheId(),
- req.futureId(),
- req.miniId(),
- req.version(),
- req.deployInfo() != null);
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
+ }
+ else if (msg instanceof GridDhtForceKeysRequest) {
+ GridDhtForceKeysRequest req = (GridDhtForceKeysRequest)msg;
- res.error(req.classError());
+ GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
+ req.cacheId(),
+ req.futureId(),
+ req.miniId(),
+ req.classError()
+ );
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
- }
-
- break;
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
+ }
+ else if (msg instanceof GridNearGetRequest) {
+ GridNearGetRequest req = (GridNearGetRequest)msg;
- case 10314: {
- GridNearGetResponse res = (GridNearGetResponse)msg;
+ GridNearGetResponse res = new GridNearGetResponse(
+ req.cacheId(),
+ req.futureId(),
+ req.miniId(),
+ req.version(),
+ req.deployInfo() != null);
- CacheGetFuture fut =
(CacheGetFuture)cctx.mvcc().future(res.futureId());
+ res.error(req.classError());
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to find future for get response
[sender=" + nodeId + ", res=" + res + ']');
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
+ }
+ else if (msg instanceof GridNearGetResponse) {
+ GridNearGetResponse res = (GridNearGetResponse)msg;
- return;
- }
+ CacheGetFuture fut =
(CacheGetFuture)cctx.mvcc().future(res.futureId());
- res.error(res.classError());
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find future for get response
[sender=" + nodeId + ", res=" + res + ']');
- fut.onResult(nodeId, res);
+ return;
}
- break;
-
- case 10025: {
- GridNearLockRequest req = (GridNearLockRequest)msg;
-
- GridNearLockResponse res = new GridNearLockResponse(
- req.cacheId(),
- req.version(),
- req.futureId(),
- req.miniId(),
- false,
- 0,
- req.classError(),
- null,
- false);
+ res.error(res.classError());
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
- }
+ fut.onResult(nodeId, res);
+ }
+ else if (msg instanceof GridNearLockRequest) {
+ GridNearLockRequest req = (GridNearLockRequest)msg;
+
+ GridNearLockResponse res = new GridNearLockResponse(
+ req.cacheId(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ false,
+ 0,
+ req.classError(),
+ null,
+ false);
+
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
+ }
+ else if (msg instanceof GridNearTxPrepareRequest) {
+ GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg;
+
+ GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+ req.partition(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.version(),
+ req.version(),
+ null,
+ null,
+ null,
+ false,
+ req.deployInfo() != null);
+
+ res.error(req.classError());
+
+ sendResponseOnFailedMessage(nodeId, res, cctx, req.policy());
+ }
+ else if (msg instanceof GridCacheQueryRequest) {
+ GridCacheQueryRequest req = (GridCacheQueryRequest)msg;
- break;
+ GridCacheQueryResponse res = new GridCacheQueryResponse(
+ req.cacheId(),
+ req.id(),
+ req.classError(),
+ cctx.deploymentEnabled());
- case 10020: {
- GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg;
+ ClusterNode node = cctx.node(nodeId);
- GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
- req.partition(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.version(),
- req.version(),
- null,
- null,
- null,
- false,
- req.deployInfo() != null);
-
- res.error(req.classError());
-
- sendResponseOnFailedMessage(nodeId, res, cctx, req.policy());
+ if (node == null) {
+ U.error(log, "Failed to send message because node left grid
[nodeId=" + nodeId +
+ ", msg=" + msg + ']');
}
-
- break;
-
- case 10911: {
- GridCacheQueryRequest req = (GridCacheQueryRequest)msg;
-
- GridCacheQueryResponse res = new GridCacheQueryResponse(
- req.cacheId(),
- req.id(),
- req.classError(),
- cctx.deploymentEnabled());
-
- ClusterNode node = cctx.node(nodeId);
-
- if (node == null) {
- U.error(log, "Failed to send message because node left
grid [nodeId=" + nodeId +
- ", msg=" + msg + ']');
- }
- else {
- cctx.io().sendOrderedMessage(
- node,
- TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId,
req.id()),
- res,
- plc,
- Long.MAX_VALUE);
- }
+ else {
+ cctx.io().sendOrderedMessage(
+ node,
+ TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()),
+ res,
+ plc,
+ Long.MAX_VALUE);
}
+ }
+ else if (msg instanceof GridNearSingleGetRequest) {
+ GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg;
- break;
+ GridNearSingleGetResponse res = new GridNearSingleGetResponse(
+ req.cacheId(),
+ req.futureId(),
+ req.topologyVersion(),
+ null,
+ false,
+ req.deployInfo() != null);
- case 10615:
- case 120: {
- processMessage(nodeId, msg, c); // Will be handled by
Rebalance Demander.
- }
+ res.error(req.classError());
- break;
-
- case 10315: {
- GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg;
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
+ }
+ else if (msg instanceof GridNearSingleGetResponse) {
+ GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg;
- GridNearSingleGetResponse res = new GridNearSingleGetResponse(
- req.cacheId(),
- req.futureId(),
- req.topologyVersion(),
- null,
- false,
- req.deployInfo() != null);
+ GridPartitionedSingleGetFuture fut =
(GridPartitionedSingleGetFuture)cctx.mvcc()
+ .future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId()));
- res.error(req.classError());
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find future for get response
[sender=" + nodeId + ", res=" + res + ']');
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
+ return;
}
- break;
+ res.error(res.classError());
- case 10316: {
- GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg;
+ fut.onResult(nodeId, res);
+ }
+ else if (msg instanceof GridNearAtomicSingleUpdateRequest) {
+ GridNearAtomicSingleUpdateRequest req =
(GridNearAtomicSingleUpdateRequest)msg;
- GridPartitionedSingleGetFuture fut =
(GridPartitionedSingleGetFuture)cctx.mvcc()
- .future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId()));
+ GridNearAtomicUpdateResponse res = new
GridNearAtomicUpdateResponse(
+ req.cacheId(),
+ nodeId,
+ req.futureId(),
+ req.partition(),
+ false);
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to find future for get response
[sender=" + nodeId + ", res=" + res + ']');
+ res.error(req.classError());
- return;
- }
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
+ }
+ else if (msg instanceof GridNearAtomicSingleUpdateInvokeRequest) {
+ GridNearAtomicSingleUpdateInvokeRequest req =
(GridNearAtomicSingleUpdateInvokeRequest)msg;
- res.error(res.classError());
+ GridNearAtomicUpdateResponse res = new
GridNearAtomicUpdateResponse(
+ req.cacheId(),
+ nodeId,
+ req.futureId(),
+ req.partition(),
+ false);
- fut.onResult(nodeId, res);
- }
+ res.error(req.classError());
- break;
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
+ }
+ else if (msg instanceof GridNearAtomicSingleUpdateFilterRequest) {
+ GridNearAtomicSingleUpdateFilterRequest req =
(GridNearAtomicSingleUpdateFilterRequest)msg;
- case 10308: {
- GridNearAtomicSingleUpdateRequest req =
(GridNearAtomicSingleUpdateRequest)msg;
+ GridNearAtomicUpdateResponse res = new
GridNearAtomicUpdateResponse(
+ req.cacheId(),
+ nodeId,
+ req.futureId(),
+ req.partition(),
+ false);
- GridNearAtomicUpdateResponse res = new
GridNearAtomicUpdateResponse(
- req.cacheId(),
- nodeId,
- req.futureId(),
- req.partition(),
- false);
+ res.error(req.classError());
- res.error(req.classError());
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
+ }
+ else if (msg instanceof GridDhtAtomicSingleUpdateRequest) {
+ GridDhtAtomicSingleUpdateRequest req =
(GridDhtAtomicSingleUpdateRequest)msg;
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
- }
+ GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
+ req.cacheId(),
+ req.partition(),
+ req.futureId());
- break;
+ res.onError(req.classError());
- case 10309: {
- GridNearAtomicSingleUpdateInvokeRequest req =
(GridNearAtomicSingleUpdateInvokeRequest)msg;
+ sendResponseOnFailedMessage(nodeId, res, cctx, plc);
- GridNearAtomicUpdateResponse res = new
GridNearAtomicUpdateResponse(
- req.cacheId(),
- nodeId,
- req.futureId(),
+ if (req.nearNodeId() != null) {
+ GridDhtAtomicNearResponse nearRes = new
GridDhtAtomicNearResponse(req.cacheId(),
req.partition(),
- false);
-
- res.error(req.classError());
-
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
- }
-
- break;
-
- case 10310: {
- GridNearAtomicSingleUpdateFilterRequest req =
(GridNearAtomicSingleUpdateFilterRequest)msg;
-
- GridNearAtomicUpdateResponse res = new
GridNearAtomicUpdateResponse(
- req.cacheId(),
+ req.nearFutureId(),
nodeId,
- req.futureId(),
- req.partition(),
- false);
-
- res.error(req.classError());
-
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
- }
-
- break;
-
- case 10306: {
- GridDhtAtomicSingleUpdateRequest req =
(GridDhtAtomicSingleUpdateRequest)msg;
-
- GridDhtAtomicUpdateResponse res = new
GridDhtAtomicUpdateResponse(
- req.cacheId(),
- req.partition(),
- req.futureId());
-
- res.onError(req.classError());
+ req.flags());
- sendResponseOnFailedMessage(nodeId, res, cctx, plc);
+ nearRes.errors(new UpdateErrors(req.classError()));
- if (req.nearNodeId() != null) {
- GridDhtAtomicNearResponse nearRes = new
GridDhtAtomicNearResponse(req.cacheId(),
- req.partition(),
- req.nearFutureId(),
- nodeId,
- req.flags());
-
- nearRes.errors(new UpdateErrors(req.classError()));
-
- sendResponseOnFailedMessage(req.nearNodeId(), nearRes,
cctx, plc);
- }
+ sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx,
plc);
}
-
- break;
-
- default:
- throw new IgniteCheckedException("Failed to send response to
node. Unsupported direct type [message="
- + msg + "]", msg.classError());
+ }
+ else if (msg instanceof GridDhtPartitionSupplyMessage)
+ processMessage(nodeId, msg, c); // Will be handled by Rebalance
Demander.
+ else {
+ throw new IgniteCheckedException("Failed to send response to node.
Unsupported direct type [message="
+ + msg + "]", msg.classError());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 29c5283c0af..cd2704daf90 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -3395,54 +3395,46 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
* @param msg Message.
*/
private void processFailedMessage(UUID nodeId, GridCacheMessage msg,
Throwable err) throws IgniteCheckedException {
- switch (msg.directType()) {
- case 10003: {
- TxLocksRequest req = (TxLocksRequest)msg;
+ if (msg instanceof TxLocksRequest) {
+ TxLocksRequest req = (TxLocksRequest)msg;
- TxLocksResponse res = new TxLocksResponse();
+ TxLocksResponse res = new TxLocksResponse();
- res.futureId(req.futureId());
+ res.futureId(req.futureId());
- try {
- cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res,
SYSTEM_POOL);
- }
- catch (ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to send response, node failed: "
+ nodeId);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send response to node (is node
still alive?) [nodeId=" + nodeId +
- ", res=" + res + ']', e);
- }
+ try {
+ cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res,
SYSTEM_POOL);
}
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send response, node failed: " +
nodeId);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send response to node (is node
still alive?) [nodeId=" + nodeId +
+ ", res=" + res + ']', e);
+ }
+ }
+ else if (msg instanceof TxLocksResponse) {
+ TxLocksResponse res = (TxLocksResponse)msg;
- break;
-
- case 10004: {
- TxLocksResponse res = (TxLocksResponse)msg;
-
- TxDeadlockFuture fut = future(res.futureId());
-
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to find future for response
[sender=" + nodeId + ", res=" + res + ']');
+ TxDeadlockFuture fut = future(res.futureId());
- return;
- }
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find future for response
[sender=" + nodeId + ", res=" + res + ']');
- if (err == null)
- fut.onResult(nodeId, res);
- else
- fut.onDone(null, err);
+ return;
}
- break;
-
- default:
- throw new IgniteCheckedException("Failed to process
message. Unsupported direct type [msg=" +
- msg + ']', msg.classError());
+ if (err == null)
+ fut.onResult(nodeId, res);
+ else
+ fut.onDone(null, err);
+ }
+ else {
+ throw new IgniteCheckedException("Failed to process message.
Unsupported direct type [msg=" +
+ msg + ']', msg.classError());
}
-
}
/**