Repository: ignite Updated Branches: refs/heads/ignite-2.1.2-exchange 377cc9d7a -> 5b2400ab4
Pass io policy in GridMessageListener. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bdd31af7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bdd31af7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bdd31af7 Branch: refs/heads/ignite-2.1.2-exchange Commit: bdd31af760f745c9ab05f75d100649a30dfe4f1e Parents: 7db925c Author: sboikov <[email protected]> Authored: Mon Jul 3 15:45:04 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Jul 3 15:45:04 2017 +0300 ---------------------------------------------------------------------- .../checkpoint/GridCheckpointManager.java | 2 +- .../managers/communication/GridIoManager.java | 10 +-- .../communication/GridMessageListener.java | 3 +- .../deployment/GridDeploymentCommunication.java | 4 +- .../eventstorage/GridEventStorageManager.java | 4 +- .../processors/cache/GridCacheIoManager.java | 77 +++++++++++--------- .../cache/binary/BinaryMetadataTransport.java | 4 +- .../cache/transactions/IgniteTxManager.java | 2 +- .../processors/cluster/ClusterProcessor.java | 2 +- .../continuous/GridContinuousProcessor.java | 4 +- .../datastreamer/DataStreamProcessor.java | 2 +- .../datastreamer/DataStreamerImpl.java | 2 +- .../processors/igfs/IgfsDataManager.java | 2 +- .../igfs/IgfsFragmentizerManager.java | 4 +- .../processors/job/GridJobProcessor.java | 8 +- .../GridMarshallerMappingProcessor.java | 4 +- .../processors/query/GridQueryProcessor.java | 2 +- .../handlers/task/GridTaskCommandHandler.java | 4 +- .../processors/task/GridTaskProcessor.java | 6 +- .../jobstealing/JobStealingCollisionSpi.java | 2 +- ...idCommunicationManagerListenersSelfTest.java | 2 +- .../GridCommunicationSendMessageSelfTest.java | 2 +- .../cache/GridCachePartitionedGetSelfTest.java | 2 +- ...lerCacheClientRequestsMappingOnMissTest.java | 6 +- ...naryObjectMetadataExchangeMultinodeTest.java | 6 +- ...DeadlockDetectionMessageMarshallingTest.java | 2 +- .../communication/GridIoManagerBenchmark.java | 4 +- .../communication/GridIoManagerBenchmark0.java | 12 +-- .../communication/GridCacheMessageSelfTest.java | 2 +- .../testframework/GridSpiTestContext.java | 5 +- .../hadoop/shuffle/HadoopShuffle.java | 2 +- .../query/h2/opt/GridH2IndexBase.java | 2 +- .../query/h2/twostep/GridMapQueryExecutor.java | 2 +- .../h2/twostep/GridReduceQueryExecutor.java | 2 +- 34 files changed, 106 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java index 782ee5e..aae1a3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java @@ -464,7 +464,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @param msg Received message. */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { GridCheckpointRequest req = (GridCheckpointRequest)msg; if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index a1ddaf4..3ff44a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -337,7 +337,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa log.debug(startInfo()); addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { ClusterNode node = ctx.discovery().node(nodeId); if (node == null) @@ -1553,7 +1553,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa CUR_PLC.set(plc); try { - lsnr.onMessage(nodeId, msg); + lsnr.onMessage(nodeId, msg, plc); } finally { if (change) @@ -2323,14 +2323,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param nodeId Node ID. * @param msg Message. */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { GridMessageListener[] arr0 = arr; if (arr0 == null) return; for (GridMessageListener l : arr0) - l.onMessage(nodeId, msg); + l.onMessage(nodeId, msg, plc); } /** @@ -2430,7 +2430,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** {@inheritDoc} */ @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions", "OverlyStrongTypeCast"}) - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (!(msg instanceof GridIoUserMessage)) { U.error(log, "Received unknown message (potentially fatal problem): " + msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java index 3993591..c7de57c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridMessageListener.java @@ -30,6 +30,7 @@ public interface GridMessageListener extends EventListener { * @param nodeId ID of node that sent the message. Note that may have already * left topology by the time this message is received. * @param msg Message received. + * @param plc Message policy (pool). */ - public void onMessage(UUID nodeId, Object msg); + public void onMessage(UUID nodeId, Object msg, byte plc); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java index 23d186a..2a5f7ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java @@ -83,7 +83,7 @@ class GridDeploymentCommunication { this.log = log.getLogger(getClass()); peerLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { processDeploymentRequest(nodeId, msg); } }; @@ -422,7 +422,7 @@ class GridDeploymentCommunication { }; GridMessageListener resLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert nodeId != null; assert msg != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index bd43e43..7836004 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -1008,7 +1008,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> GridMessageListener resLsnr = new GridMessageListener() { @SuppressWarnings("deprecation") - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert nodeId != null; assert msg != null; @@ -1185,7 +1185,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> */ private class RequestListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert nodeId != null; assert msg != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index a920bd0..49cfcdd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -155,7 +155,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** Message listener. */ private GridMessageListener lsnr = new GridMessageListener() { - @Override public void onMessage(final UUID nodeId, final Object msg) { + @Override public void onMessage(final UUID nodeId, final Object msg, final byte plc) { if (log.isDebugEnabled()) log.debug("Received unordered cache communication message [nodeId=" + nodeId + ", locId=" + cctx.localNodeId() + ", msg=" + msg + ']'); @@ -196,7 +196,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @Override public void apply(IgniteInternalFuture<?> fut) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - handleMessage(nodeId, cacheMsg); + handleMessage(nodeId, cacheMsg, plc); } }); } @@ -269,40 +269,48 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { log.debug(msg0.toString()); } - handleMessage(nodeId, cacheMsg); + handleMessage(nodeId, cacheMsg, plc); } }; if (stripe >= 0) cctx.kernalContext().getStripedExecutorService().execute(stripe, c); - else - cctx.kernalContext().closure().runLocalSafe(c); + else { + try { + cctx.kernalContext().pools().poolForPolicy(plc).execute(c); + } + catch (IgniteCheckedException e) { + U.error(cacheMsg.messageLogger(cctx), "Failed to get pool for policy: " + plc, e); + } + } } }); return; } - handleMessage(nodeId, cacheMsg); + handleMessage(nodeId, cacheMsg, plc); } }; /** * @param nodeId Sender node ID. * @param cacheMsg Message. + * @param plc Message policy. */ @SuppressWarnings("unchecked") - private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg) { - handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers); + private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, byte plc) { + handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers, plc); } /** * @param nodeId Sender node ID. * @param cacheMsg Message. * @param msgHandlers Message handlers. + * @param plc Message policy. */ @SuppressWarnings("unchecked") - private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers) { + private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers, byte plc) { Lock lock = rw.readLock(); lock.lock(); @@ -356,7 +364,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { return; } - onMessage0(nodeId, cacheMsg, c); + onMessage0(nodeId, cacheMsg, c, plc); } finally { lock.unlock(); @@ -517,10 +525,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * @param nodeId Node ID. * @param cacheMsg Cache message. * @param c Handler closure. + * @param plc Message policy. */ @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg, - final IgniteBiInClosure<UUID, GridCacheMessage> c) { + final IgniteBiInClosure<UUID, GridCacheMessage> c, byte plc) { try { if (stopping) { if (log.isDebugEnabled()) @@ -536,7 +545,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { unmarshall(nodeId, cacheMsg); if (cacheMsg.classError() != null) - processFailedMessage(nodeId, cacheMsg, c); + processFailedMessage(nodeId, cacheMsg, c, plc); else processMessage(nodeId, cacheMsg, c); } @@ -669,15 +678,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * * @param nodeId Node ID. * @param msg Message. + * @param c Closure. + * @param plc Message policy. * @throws IgniteCheckedException If failed. */ - private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c) + private void processFailedMessage(UUID nodeId, + GridCacheMessage msg, + IgniteBiInClosure<UUID, GridCacheMessage> c, + byte plc) throws IgniteCheckedException { assert msg != null; - GridCacheContext ctx = msg instanceof GridCacheIdMessage ? - cctx.cacheContext(((GridCacheIdMessage)msg).cacheId()) : null; - switch (msg.directType()) { case 30: { GridDhtLockRequest req = (GridDhtLockRequest)msg; @@ -688,9 +699,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { req.futureId(), req.miniId(), 0, - ctx.deploymentEnabled()); + false); - sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); } break; @@ -723,7 +734,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.onError(req.classError()); - sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); if (req.nearNodeId() != null) { GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(), @@ -734,7 +745,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { nearRes.errors(new UpdateErrors(req.classError())); - sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc); } } @@ -753,7 +764,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.error(req.classError()); - sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); } break; @@ -770,7 +781,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.error(req.classError()); - sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); } break; @@ -793,7 +804,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.error(req.classError()); - sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); } break; @@ -831,7 +842,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { null, false); - sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); } break; @@ -872,7 +883,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { cctx.node(nodeId), TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()), res, - ctx.ioPolicy(), + plc, Long.MAX_VALUE); } @@ -897,7 +908,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.error(req.classError()); - sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); } break; @@ -935,7 +946,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.error(req.classError()); - sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); } break; @@ -953,7 +964,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.error(req.classError()); - sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); } break; @@ -971,7 +982,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.error(req.classError()); - sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); } break; @@ -987,7 +998,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.onError(req.classError()); - sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(nodeId, res, cctx, plc); if (req.nearNodeId() != null) { GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(), @@ -998,7 +1009,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { nearRes.errors(new UpdateErrors(req.classError())); - sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, ctx.ioPolicy()); + sendResponseOnFailedMessage(req.nearNodeId(), nearRes, cctx, plc); } } @@ -1540,7 +1551,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass", "unchecked"}) - @Override public void onMessage(final UUID nodeId, Object msg) { + @Override public void onMessage(final UUID nodeId, Object msg, byte plc) { if (log.isDebugEnabled()) log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']'); @@ -1551,7 +1562,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { try { GridCacheMessage cacheMsg = (GridCacheMessage)msg; - onMessage0(nodeId, cacheMsg, c); + onMessage0(nodeId, cacheMsg, c, plc); } finally { lock.unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java index 00c760f..5fd7295 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java @@ -561,7 +561,7 @@ final class BinaryMetadataTransport { } /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert msg instanceof MetadataRequestMessage : msg; MetadataRequestMessage msg0 = (MetadataRequestMessage) msg; @@ -606,7 +606,7 @@ final class BinaryMetadataTransport { private final class MetadataResponseListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert msg instanceof MetadataResponseMessage : msg; MetadataResponseMessage msg0 = (MetadataResponseMessage) msg; http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index a9aa13d..3a3b766 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -2433,7 +2433,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private class DeadlockDetectionListener implements GridMessageListener { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { GridCacheMessage cacheMsg = (GridCacheMessage)msg; Throwable err = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index ed83650..0bd2370 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -146,7 +146,7 @@ public class ClusterProcessor extends GridProcessorAdapter { EVT_NODE_FAILED, EVT_NODE_LEFT); ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (msg instanceof IgniteDiagnosticMessage) { IgniteDiagnosticMessage msg0 = (IgniteDiagnosticMessage)msg; http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index f641399..b1d3442 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -289,7 +289,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { }); ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object obj) { + @Override public void onMessage(UUID nodeId, Object obj, byte plc) { GridContinuousMessage msg = (GridContinuousMessage)obj; if (msg.data() == null && msg.dataBytes() != null) { @@ -771,7 +771,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { private void registerMessageListener(GridContinuousHandler hnd) { if (hnd.orderedTopic() != null) { ctx.io().addMessageListener(hnd.orderedTopic(), new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object obj) { + @Override public void onMessage(UUID nodeId, Object obj, byte plc) { GridContinuousMessage msg = (GridContinuousMessage)obj; // Only notification can be ordered. http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index c52f7ac..31ae1e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -82,7 +82,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { if (!ctx.clientNode()) { ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert msg instanceof DataStreamerRequest; processRequest(nodeId, (DataStreamerRequest)msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 40988d3..ae441de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -319,7 +319,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed topic = TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId())); ctx.io().addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert msg instanceof DataStreamerResponse; DataStreamerResponse res = (DataStreamerResponse)msg; http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index 8ae6db8..a4ea337 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -158,7 +158,7 @@ public class IgfsDataManager extends IgfsManager { topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName); igfsCtx.kernalContext().io().addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (msg instanceof IgfsBlocksMessage) processBlocksMessage(nodeId, (IgfsBlocksMessage)msg); else if (msg instanceof IgfsAckMessage) http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java index 7797f89..0c75bc3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java @@ -453,7 +453,7 @@ public class IgfsFragmentizerManager extends IgfsManager { } /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (msg instanceof IgfsFragmentizerResponse) { IgfsFragmentizerResponse res = (IgfsFragmentizerResponse)msg; @@ -673,7 +673,7 @@ public class IgfsFragmentizerManager extends IgfsManager { } /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (msg instanceof IgfsFragmentizerRequest || msg instanceof IgfsSyncMessage) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index 408396a..5ae4da6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -447,7 +447,7 @@ public class GridJobProcessor extends GridProcessorAdapter { final Condition cond = lock.newCondition(); GridMessageListener msgLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { String err = null; GridJobSiblingsResponse res = null; @@ -1856,7 +1856,7 @@ public class GridJobProcessor extends GridProcessorAdapter { */ private class JobSessionListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert nodeId != null; assert msg != null; @@ -1872,7 +1872,7 @@ public class GridJobProcessor extends GridProcessorAdapter { */ private class JobCancelListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert nodeId != null; assert msg != null; @@ -1890,7 +1890,7 @@ public class GridJobProcessor extends GridProcessorAdapter { */ private class JobExecutionListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert nodeId != null; assert msg != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index 8de6c49..2543042 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -168,7 +168,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert msg instanceof MissingMappingRequestMessage : msg; MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg; @@ -200,7 +200,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter { */ private final class MissingMappingResponseListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert msg instanceof MissingMappingResponseMessage : msg; MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg; http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index dd07584..d55a129 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -207,7 +207,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { valCtx = new CacheQueryObjectValueContext(ctx); ioLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (msg instanceof SchemaOperationStatusMessage) { SchemaOperationStatusMessage msg0 = (SchemaOperationStatusMessage)msg; http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java index 99ba335..d9b49cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java @@ -109,7 +109,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter { super(ctx); ctx.io().addMessageListener(TOPIC_REST, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (!(msg instanceof GridTaskResultRequest)) { U.warn(log, "Received unexpected message instead of task result request: " + msg); @@ -425,7 +425,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter { final Condition cond = lock.newCondition(); GridMessageListener msgLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { String err = null; GridTaskResultResponse res = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index 6ae97dd..d7a022e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -1282,7 +1282,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (msg instanceof GridJobExecuteResponse) processJobExecuteResponse(nodeId, (GridJobExecuteResponse)msg); else if (jobResOnly) @@ -1326,7 +1326,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { */ private class JobSiblingsMessageListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (!(msg instanceof GridJobSiblingsRequest)) { U.warn(log, "Received unexpected message instead of siblings request: " + msg); @@ -1398,7 +1398,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { */ private class TaskCancelMessageListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { assert msg != null; if (!(msg instanceof GridTaskCancelRequest)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java index 6f2c099..39a8e8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java @@ -648,7 +648,7 @@ public class JobStealingCollisionSpi extends IgniteSpiAdapter implements Collisi spiCtx.addMessageListener( msgLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { MessageInfo info = rcvMsgMap.get(nodeId); if (info == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java index 1738da8..03b7921 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationManagerListenersSelfTest.java @@ -159,7 +159,7 @@ public class GridCommunicationManagerListenersSelfTest extends GridCommonAbstrac } /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { // No-op. } } http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java index 29b7847..3563c77 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java @@ -143,7 +143,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT); mgr1.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (msgCls.isInstance(msg)) latch.countDown(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java index 308f2b4..d5988f1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java @@ -224,7 +224,7 @@ public class GridCachePartitionedGetSelfTest extends GridCommonAbstractTest { ((IgniteKernal)g).context().io().addMessageListener( TOPIC_CACHE, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { info("Received message from node [nodeId=" + nodeId + ", msg=" + msg + ']'); if (msg instanceof GridNearSingleGetRequest) { http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java index f5f2512..057b970 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingOnMissTest.java @@ -300,10 +300,10 @@ public class IgniteMarshallerCacheClientRequestsMappingOnMissTest extends GridCo final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_MAPPING_MARSH.ordinal()]; GridMessageListener wrapper = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { mappingReqsCounter.incrementAndGet(); - delegate.onMessage(nodeId, msg); + delegate.onMessage(nodeId, msg, plc); } }; @@ -321,7 +321,7 @@ public class IgniteMarshallerCacheClientRequestsMappingOnMissTest extends GridCo ioMgr.removeMessageListener(GridTopic.TOPIC_MAPPING_MARSH); ioMgr.addMessageListener(GridTopic.TOPIC_MAPPING_MARSH, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { new Thread(new Runnable() { @Override public void run() { mappingReqsCounter.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java index 9370e27..6ff703e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java @@ -396,7 +396,7 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm ioMgr.removeMessageListener(GridTopic.TOPIC_METADATA_REQ); ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { new Thread(new Runnable() { @Override public void run() { metadataReqsCounter.incrementAndGet(); @@ -416,9 +416,9 @@ public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridComm final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_METADATA_REQ.ordinal()]; GridMessageListener wrapper = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { metadataReqsCounter.incrementAndGet(); - delegate.onMessage(nodeId, msg); + delegate.onMessage(nodeId, msg, plc); } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java index 9126053..1a48cec 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionMessageMarshallingTest.java @@ -76,7 +76,7 @@ public class TxDeadlockDetectionMessageMarshallingTest extends GridCommonAbstrac final AtomicBoolean res = new AtomicBoolean(); clientCtx.gridIO().addMessageListener(TOPIC, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (msg instanceof TxLocksResponse) { try { ((TxLocksResponse)msg).finishUnmarshal(clientCtx, clientCtx.deploy().globalLoader()); http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java index 03bbb00..5671158 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark.java @@ -240,7 +240,7 @@ public class GridIoManagerBenchmark { GridMessageListener lsnr = new GridMessageListener() { private ClusterNode node; - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (node == null) node = g.context().discovery().node(nodeId); @@ -336,7 +336,7 @@ public class GridIoManagerBenchmark { */ private static class SenderMessageListener implements GridMessageListener { /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { msgCntr.increment(); if (testLatency) http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java index 0f0332f..7b1d972 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridIoManagerBenchmark0.java @@ -130,7 +130,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { rcv.addMessageListener( topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { try { rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL); } @@ -141,7 +141,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { }); snd.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { msgCntr.increment(); sem.release(); @@ -224,7 +224,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { rcv.addMessageListener( topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { try { rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL); } @@ -235,7 +235,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { }); snd.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { map.get(((GridTestMessage)msg).id()).countDown(); } }); @@ -324,7 +324,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { rcv.addMessageListener( topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { try { rcv.sendToCustomTopic(sndNode, topic, (Message)msg, PUBLIC_POOL); } @@ -335,7 +335,7 @@ public class GridIoManagerBenchmark0 extends GridCommonAbstractTest { }); snd.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { msgCntr.increment(); sem.release(); http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java index 4a6b765..435af8f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java @@ -139,7 +139,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest { final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT); mgr1.addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { try { latch.countDown(); http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 7d3c5d6..93cd911 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridIoUserMessage; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -328,7 +329,7 @@ public class GridSpiTestContext implements IgniteSpiContext { @SuppressWarnings("deprecation") public void triggerMessage(ClusterNode node, Object msg) { for (GridMessageListener lsnr : msgLsnrs) - lsnr.onMessage(node.id(), msg); + lsnr.onMessage(node.id(), msg, GridIoPolicy.SYSTEM_POOL); } /** {@inheritDoc} */ @@ -667,7 +668,7 @@ public class GridSpiTestContext implements IgniteSpiContext { @SuppressWarnings({ "SynchronizationOnLocalVariableOrMethodParameter", "ConstantConditions", "OverlyStrongTypeCast"}) - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { GridIoUserMessage ioMsg = (GridIoUserMessage)msg; Object msgBody = ioMsg.body(); http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java index 3296993..cd1c93c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -63,7 +63,7 @@ public class HadoopShuffle extends HadoopComponent { super.start(ctx); ctx.kernalContext().io().addMessageListener(GridTopic.TOPIC_HADOOP_MSG, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { onMessageReceived(nodeId, (HadoopMessage)msg); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index 3dabc58..542adf0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -135,7 +135,7 @@ public abstract class GridH2IndexBase extends BaseIndex { msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifierString() + '.' + getName()); msgLsnr = new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { GridSpinBusyLock l = desc.indexing().busyLock(); if (!l.enterBusy()) http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 6b7ba75..fcf5f10 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -172,7 +172,7 @@ public class GridMapQueryExecutor { }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (!busyLock.enterBusy()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/bdd31af7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index b85fa61..85a7e0b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -169,7 +169,7 @@ public class GridReduceQueryExecutor { log = ctx.log(GridReduceQueryExecutor.class); ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (!busyLock.enterBusy()) return;
