zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b78f318 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b78f318 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b78f318 Branch: refs/heads/ignite-zk Commit: 0b78f318b12bb03f4ddce32e7f9c5d4eba8305ca Parents: 6961ddc Author: sboikov <sboi...@gridgain.com> Authored: Thu Dec 28 12:03:34 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Dec 28 12:03:34 2017 +0300 ---------------------------------------------------------------------- .../DiscoveryMessageResultsCollector.java | 222 +++++++++++++++++++ .../continuous/GridContinuousProcessor.java | 191 +++++++--------- 2 files changed, 302 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0b78f318/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java new file mode 100644 index 0000000..72a4636 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public abstract class DiscoveryMessageResultsCollector<M, R> { + /** */ + private final Map<UUID, NodeMessage<M>> rcvd = new HashMap<>(); + + /** */ + private int leftMsgs; + + /** */ + protected DiscoCache discoCache; + + /** */ + protected final GridKernalContext ctx; + + /** + * @param ctx Context. + */ + protected DiscoveryMessageResultsCollector(GridKernalContext ctx) { + this.ctx = ctx; + } + + /** + * @param rcvd Received messages. + * @return Result. + */ + protected abstract R createResult(Map<UUID, NodeMessage<M>> rcvd); + + /** + * @param r Result. + */ + protected abstract void onResultsCollected(R r); + + /** + * @param discoCache Discovery state when discovery message was received. + * @param node Node. + * @return {@code True} if need wait for result from given node. + */ + protected abstract boolean waitForNode(DiscoCache discoCache, ClusterNode node); + + /** + * @param discoCache Discovery state. + */ + public final void init(DiscoCache discoCache) { + assert discoCache != null; + + R res = null; + + synchronized (this) { + assert this.discoCache == null; + assert leftMsgs == 0 : leftMsgs; + + this.discoCache = discoCache; + + for (ClusterNode node : discoCache.allNodes()) { + if (ctx.discovery().alive(node) && waitForNode(discoCache, node) && !rcvd.containsKey(node.id())) { + rcvd.put(node.id(), new NodeMessage<>((M)null)); + + leftMsgs++; + } + } + + if (leftMsgs == 0) + res = createResult(rcvd); + } + + if (res != null) + onResultsCollected(res); + } + + /** + * @param nodeId Node ID. + * @param msg Message. + */ + public final void onMessage(UUID nodeId, M msg) { + R res = null; + + synchronized (this) { + if (allReceived()) + return; + + NodeMessage<M> expMsg = rcvd.get(nodeId); + + if (expMsg == null) + rcvd.put(nodeId, new NodeMessage<>(msg)); + else if (expMsg.set(msg)) { + assert leftMsgs > 0; + + leftMsgs--; + + if (allReceived()) + res = createResult(rcvd); + } + } + + if (res != null) + onResultsCollected(res); + } + + /** + * @param nodeId Failed node ID. + */ + public final void onNodeFail(UUID nodeId) { + R res = null; + + synchronized (this) { + if (allReceived()) + return; + + NodeMessage expMsg = rcvd.get(nodeId); + + if (expMsg != null && expMsg.onNodeFailed()) { + assert leftMsgs > 0 : leftMsgs; + + leftMsgs--; + + if (allReceived()) + res = createResult(rcvd); + } + } + + if (res != null) + onResultsCollected(res); + } + + /** + * @return {@code True} if expected messages are initialized and all message are received. + */ + private boolean allReceived() { + return discoCache != null && leftMsgs == 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoveryMessageResultsCollector.class, this); + } + + /** + * + */ + protected static class NodeMessage<M> { + /** */ + boolean nodeFailed; + + /** */ + M msg; + + /** + * @param msg Message. + */ + NodeMessage(M msg) { + this.msg = msg; + } + + /** + * @return Message or {@code null} if node failed. + */ + @Nullable public M message() { + return msg; + } + + /** + * @return {@code True} if node result was not set before. + */ + boolean onNodeFailed() { + if (nodeFailed || msg != null) + return false; + + nodeFailed = true; + + return false; + } + + /** + * @param msg Received message. + * @return {@code True} if node result was not set before. + */ + boolean set(M msg) { + assert msg != null; + + if (this.msg != null) + return false; + + this.msg = msg; + + return !nodeFailed; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NodeMessage.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0b78f318/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index ee12de3..950ac7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -49,12 +49,13 @@ import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.discovery.CustomEventListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageResultsCollector; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; @@ -1403,7 +1404,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { routinesInfo.addRoutineInfo(routineInfo); - final Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer); + final DiscoCache discoCache = ctx.discovery().discoCache(topVer); // Should not use marshaller and send messages from discovery thread. ctx.getSystemExecutorService().execute(new Runnable() { @@ -1412,7 +1413,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { StartFuture fut = startFuts.get(msg.routineId()); if (fut != null) - fut.initRemoteNodes(topVer, nodes); + fut.initRemoteNodes(discoCache); return; } @@ -2340,19 +2341,83 @@ public class GridContinuousProcessor extends GridProcessorAdapter { private volatile boolean rmt; /** */ - private AffinityTopologyVersion topVer; - - /** */ - private int expRes; - - /** */ - private final Map<UUID, ContinuousRoutineStartResultMessage> res = new HashMap<>(); + private final DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage, RoutineRegisterResults> + resCollect; /** * @param routineId Consume ID. */ StartFuture(UUID routineId) { this.routineId = routineId; + + resCollect = new DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage, RoutineRegisterResults>(ctx) { + @Override protected RoutineRegisterResults createResult(Map<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> rcvd) { + Map<UUID, Exception> errs = null; + Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = null; + + for (Map.Entry<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> entry : rcvd.entrySet()) { + ContinuousRoutineStartResultMessage msg = entry.getValue().message(); + + if (msg == null) + continue; + + if (msg.error()) { + byte[] errBytes = msg.errorBytes(); + + Exception err = null; + + if (errBytes != null) { + try { + err = U.unmarshal(marsh, errBytes, U.resolveClassLoader(ctx.config())); + } + catch (Exception e) { + U.warn(log, "Failed to unmarhal continuous routine start error: " + e); + } + } + + if (err == null) { + err = new IgniteCheckedException("Failed to start continuous " + + "routine on node: " + entry.getKey()); + } + + if (errs == null) + errs = new HashMap<>(); + + errs.put(entry.getKey(), err); + } + else { + byte[] cntrsMapBytes = msg.countersMapBytes(); + + if (cntrsMapBytes != null) { + try { + CachePartitionPartialCountersMap cntrsMap = U.unmarshal( + marsh, + cntrsMapBytes, + U.resolveClassLoader(ctx.config())); + + if (cntrsPerNode == null) + cntrsPerNode = new HashMap<>(); + + cntrsPerNode.put(entry.getKey(), CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); + } + catch (Exception e) { + U.warn(log, "Failed to unmarhal continuous query update counters: " + e); + } + } + } + } + + return new RoutineRegisterResults(discoCache.version(), errs, cntrsPerNode); + } + + @Override protected void onResultsCollected(RoutineRegisterResults res0) { + onAllRemoteRegistered(res0.topVer, res0.errs, res0.cntrsPerNode, null); + } + + @Override protected boolean waitForNode(DiscoCache discoCache, ClusterNode node) { + return !ctx.localNodeId().equals(node.id()); + } + }; } /** @@ -2400,28 +2465,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** - * @param topVer Topology version. - * @param nodes Nodes. + * @param discoCache Discovery state. */ - void initRemoteNodes(AffinityTopologyVersion topVer, Collection<ClusterNode> nodes) { - RoutineRegisterResults res0 = null; - - synchronized (res) { - assert this.topVer == null && expRes == 0; - - this.topVer = topVer; - - for (ClusterNode node : nodes) { - if (!ctx.localNodeId().equals(node.id()) && ctx.discovery().alive(node.id())) - expRes++; - } - - if (res.size() >= expRes) - res0 = createRegisterResults(); - } - - if (res0 != null) - onAllRemoteRegistered(res0.topVer, res0.errs, res0.cntrsPerNode, null); + void initRemoteNodes(DiscoCache discoCache) { + resCollect.init(discoCache); } /** @@ -2429,27 +2476,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param msg Message. */ void onResult(UUID nodeId, ContinuousRoutineStartResultMessage msg) { - RoutineRegisterResults res0 = null; - - synchronized (res) { - if (res.containsKey(nodeId) || (topVer != null && res.size() >= expRes)) - return; - - res.put(nodeId, msg); - - if (topVer != null && res.size() >= expRes) - res0 = createRegisterResults(); - } - - if (res0 != null) - onAllRemoteRegistered(res0.topVer, res0.errs, res0.cntrsPerNode, null); + resCollect.onMessage(nodeId, msg); } /** * @param nodeId Failed node ID. */ void onNodeFail(UUID nodeId) { - onResult(nodeId, new ContinuousRoutineStartResultMessage(routineId, null, null, false)); + resCollect.onNodeFail(nodeId); } /** @@ -2472,65 +2506,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { onDone(routineId); } - /** - * @return Results. - */ - private RoutineRegisterResults createRegisterResults() { - Map<UUID, Exception> errs = null; - Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = null; - - for (Map.Entry<UUID, ContinuousRoutineStartResultMessage> entry : res.entrySet()) { - ContinuousRoutineStartResultMessage msg = entry.getValue(); - - if (msg.error()) { - byte[] errBytes = msg.errorBytes(); - - Exception err = null; - - if (errBytes != null) { - try { - err = U.unmarshal(marsh, errBytes, U.resolveClassLoader(ctx.config())); - } - catch (Exception e) { - U.warn(log, "Failed to unmarhal continuous routine start error: " + e); - } - } - - if (err == null) { - err = new IgniteCheckedException("Failed to start continuous " + - "routine on node: " + entry.getKey()); - } - - if (errs == null) - errs = new HashMap<>(); - - errs.put(entry.getKey(), err); - } - else { - byte[] cntrsMapBytes = msg.countersMapBytes(); - - if (cntrsMapBytes != null) { - try { - CachePartitionPartialCountersMap cntrsMap = U.unmarshal( - marsh, - cntrsMapBytes, - U.resolveClassLoader(ctx.config())); - - if (cntrsPerNode == null) - cntrsPerNode = new HashMap<>(); - - cntrsPerNode.put(entry.getKey(), CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); - } - catch (Exception e) { - U.warn(log, "Failed to unmarhal continuous query update counters: " + e); - } - } - } - } - - return new RoutineRegisterResults(topVer, errs, cntrsPerNode); - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(StartFuture.class, this); @@ -2568,9 +2543,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * Future for stop routine. */ private static class StopFuture extends GridFutureAdapter<Object> { - /** */ - private static final long serialVersionUID = 0L; - /** Timeout object. */ private volatile GridTimeoutObject timeoutObj; @@ -2614,9 +2586,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { */ private static class SyncMessageAckFuture extends GridFutureAdapter<Object> { /** */ - private static final long serialVersionUID = 0L; - - /** */ private UUID nodeId; /**