Minor: moved custom events processing in GridContinuousProcessor's methods.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eef5afda Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eef5afda Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eef5afda Branch: refs/heads/ignite-7016 Commit: eef5afdacc9de77175c4452be864ee77930dc57a Parents: 5fa5ae7 Author: sboikov <sboi...@gridgain.com> Authored: Wed Nov 29 11:34:23 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Nov 29 11:34:23 2017 +0300 ---------------------------------------------------------------------- .../continuous/GridContinuousProcessor.java | 132 ++++++++++++------- 1 file changed, 85 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/eef5afda/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index fa52be2..571d654 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -176,8 +176,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StartRoutineDiscoveryMessage msg) { - if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping()) - processStartRequest(snd, msg); + if (ctx.isStopping()) + return; + + processStartRequest(snd, msg); } }); @@ -186,39 +188,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StartRoutineAckDiscoveryMessage msg) { - StartFuture fut = startFuts.remove(msg.routineId()); - - if (fut != null) { - if (msg.errs().isEmpty()) { - LocalRoutineInfo routine = locInfos.get(msg.routineId()); - - // Update partition counters. - if (routine != null && routine.handler().isQuery()) { - Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = msg.updateCountersPerNode(); - Map<Integer, T2<Long, Long>> cntrs = msg.updateCounters(); - - GridCacheAdapter<Object, Object> interCache = - ctx.cache().internalCache(routine.handler().cacheName()); - - GridCacheContext cctx = interCache != null ? interCache.context() : null; - - if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) - cntrsPerNode.put(ctx.localNodeId(), - toCountersMap(cctx.topology().localUpdateCounters(false))); - - routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); - } - - fut.onRemoteRegistered(); - } - else { - IgniteCheckedException firstEx = F.first(msg.errs().values()); - - fut.onDone(firstEx); + if (ctx.isStopping()) + return; - stopRoutine(msg.routineId()); - } - } + processStartAckRequest(topVer, msg); } }); @@ -227,16 +200,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StopRoutineDiscoveryMessage msg) { - if (!snd.id().equals(ctx.localNodeId())) { - UUID routineId = msg.routineId(); - - unregisterRemote(routineId); - } + if (ctx.isStopping()) + return; - for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) { - if (clientInfo.remove(msg.routineId()) != null) - break; - } + processStopRequest(snd, msg); } }); @@ -245,10 +212,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StopRoutineAckDiscoveryMessage msg) { - StopFuture fut = stopFuts.remove(msg.routineId()); + if (ctx.isStopping()) + return; - if (fut != null) - fut.onDone(); + processStopAckRequest(msg); } }); @@ -459,7 +426,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { if (log.isDebugEnabled()) { - log.info("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() + + log.debug("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() + ", loc=" + ctx.localNodeId() + ", data=" + data.joiningNodeData() + ']'); @@ -976,11 +943,82 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param msg Message. + */ + private void processStopAckRequest(StopRoutineAckDiscoveryMessage msg) { + StopFuture fut = stopFuts.remove(msg.routineId()); + + if (fut != null) + fut.onDone(); + } + + /** + * @param snd Sender node. + * @param msg Message/ + */ + private void processStopRequest(ClusterNode snd, StopRoutineDiscoveryMessage msg) { + if (!snd.id().equals(ctx.localNodeId())) { + UUID routineId = msg.routineId(); + + unregisterRemote(routineId); + } + + for (Map<UUID, LocalRoutineInfo> clientInfo : clientInfos.values()) { + if (clientInfo.remove(msg.routineId()) != null) + break; + } + } + + /** + * @param topVer Topology version. + * @param msg Message. + */ + private void processStartAckRequest(AffinityTopologyVersion topVer, + StartRoutineAckDiscoveryMessage msg) { + StartFuture fut = startFuts.remove(msg.routineId()); + + if (fut != null) { + if (msg.errs().isEmpty()) { + LocalRoutineInfo routine = locInfos.get(msg.routineId()); + + // Update partition counters. + if (routine != null && routine.handler().isQuery()) { + Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = msg.updateCountersPerNode(); + Map<Integer, T2<Long, Long>> cntrs = msg.updateCounters(); + + GridCacheAdapter<Object, Object> interCache = + ctx.cache().internalCache(routine.handler().cacheName()); + + GridCacheContext cctx = interCache != null ? interCache.context() : null; + + if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) + cntrsPerNode.put(ctx.localNodeId(), + toCountersMap(cctx.topology().localUpdateCounters(false))); + + routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); + } + + fut.onRemoteRegistered(); + } + else { + IgniteCheckedException firstEx = F.first(msg.errs().values()); + + fut.onDone(firstEx); + + stopRoutine(msg.routineId()); + } + } + } + + /** * @param node Sender. * @param req Start request. */ private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage req) { UUID routineId = req.routineId(); + if (node.id().equals(ctx.localNodeId())) + return; + StartRequestData data = req.startRequestData(); GridContinuousHandler hnd = data.handler();