zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b78f318
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b78f318
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b78f318

Branch: refs/heads/ignite-zk
Commit: 0b78f318b12bb03f4ddce32e7f9c5d4eba8305ca
Parents: 6961ddc
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Dec 28 12:03:34 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Dec 28 12:03:34 2017 +0300

----------------------------------------------------------------------
 .../DiscoveryMessageResultsCollector.java       | 222 +++++++++++++++++++
 .../continuous/GridContinuousProcessor.java     | 191 +++++++---------
 2 files changed, 302 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0b78f318/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
new file mode 100644
index 0000000..72a4636
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class DiscoveryMessageResultsCollector<M, R>  {
+    /** */
+    private final Map<UUID, NodeMessage<M>> rcvd = new HashMap<>();
+
+    /** */
+    private int leftMsgs;
+
+    /** */
+    protected DiscoCache discoCache;
+
+    /** */
+    protected final GridKernalContext ctx;
+
+    /**
+     * @param ctx Context.
+     */
+    protected DiscoveryMessageResultsCollector(GridKernalContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /**
+     * @param rcvd Received messages.
+     * @return Result.
+     */
+    protected abstract R createResult(Map<UUID, NodeMessage<M>> rcvd);
+
+    /**
+     * @param r Result.
+     */
+    protected abstract void onResultsCollected(R r);
+
+    /**
+     * @param discoCache Discovery state when discovery message was received.
+     * @param node Node.
+     * @return {@code True} if need wait for result from given node.
+     */
+    protected abstract boolean waitForNode(DiscoCache discoCache, ClusterNode 
node);
+
+    /**
+     * @param discoCache Discovery state.
+     */
+    public final void init(DiscoCache discoCache) {
+        assert discoCache != null;
+
+        R res = null;
+
+        synchronized (this) {
+            assert this.discoCache == null;
+            assert leftMsgs == 0 : leftMsgs;
+
+            this.discoCache = discoCache;
+
+            for (ClusterNode node : discoCache.allNodes()) {
+                if (ctx.discovery().alive(node) && waitForNode(discoCache, 
node) && !rcvd.containsKey(node.id())) {
+                    rcvd.put(node.id(), new NodeMessage<>((M)null));
+
+                    leftMsgs++;
+                }
+            }
+
+            if (leftMsgs == 0)
+                res = createResult(rcvd);
+        }
+
+        if (res != null)
+            onResultsCollected(res);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    public final void onMessage(UUID nodeId, M msg) {
+        R res = null;
+
+        synchronized (this) {
+            if (allReceived())
+                return;
+
+            NodeMessage<M> expMsg = rcvd.get(nodeId);
+
+            if (expMsg == null)
+                rcvd.put(nodeId, new NodeMessage<>(msg));
+            else if (expMsg.set(msg)) {
+                assert leftMsgs > 0;
+
+                leftMsgs--;
+
+                if (allReceived())
+                    res = createResult(rcvd);
+            }
+        }
+
+        if (res != null)
+            onResultsCollected(res);
+    }
+
+    /**
+     * @param nodeId Failed node ID.
+     */
+    public final void onNodeFail(UUID nodeId) {
+        R res = null;
+
+        synchronized (this) {
+            if (allReceived())
+                return;
+
+            NodeMessage expMsg = rcvd.get(nodeId);
+
+            if (expMsg != null && expMsg.onNodeFailed()) {
+                assert leftMsgs > 0 : leftMsgs;
+
+                leftMsgs--;
+
+                if (allReceived())
+                    res = createResult(rcvd);
+            }
+        }
+
+        if (res != null)
+            onResultsCollected(res);
+    }
+
+    /**
+     * @return {@code True} if expected messages are initialized and all 
message are received.
+     */
+    private boolean allReceived() {
+        return discoCache != null && leftMsgs == 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DiscoveryMessageResultsCollector.class, this);
+    }
+
+    /**
+     *
+     */
+    protected static class NodeMessage<M> {
+        /** */
+        boolean nodeFailed;
+
+        /** */
+        M msg;
+
+        /**
+         * @param msg Message.
+         */
+        NodeMessage(M msg) {
+            this.msg = msg;
+        }
+
+        /**
+         * @return Message or {@code null} if node failed.
+         */
+        @Nullable public M message() {
+            return msg;
+        }
+
+        /**
+         * @return {@code True} if node result was not set before.
+         */
+        boolean onNodeFailed() {
+            if (nodeFailed || msg != null)
+                return false;
+
+            nodeFailed = true;
+
+            return false;
+        }
+
+        /**
+         * @param msg Received message.
+         * @return {@code True} if node result was not set before.
+         */
+        boolean set(M msg) {
+            assert msg != null;
+
+            if (this.msg != null)
+                return false;
+
+            this.msg = msg;
+
+            return !nodeFailed;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(NodeMessage.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0b78f318/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index ee12de3..950ac7e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -49,12 +49,13 @@ import 
org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import 
org.apache.ignite.internal.managers.discovery.DiscoveryMessageResultsCollector;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -1403,7 +1404,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         routinesInfo.addRoutineInfo(routineInfo);
 
-        final Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer);
+        final DiscoCache discoCache = ctx.discovery().discoCache(topVer);
 
         // Should not use marshaller and send messages from discovery thread.
         ctx.getSystemExecutorService().execute(new Runnable() {
@@ -1412,7 +1413,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                     StartFuture fut = startFuts.get(msg.routineId());
 
                     if (fut != null)
-                        fut.initRemoteNodes(topVer, nodes);
+                        fut.initRemoteNodes(discoCache);
 
                     return;
                 }
@@ -2340,19 +2341,83 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         private volatile boolean rmt;
 
         /** */
-        private AffinityTopologyVersion topVer;
-
-        /** */
-        private int expRes;
-
-        /** */
-        private final Map<UUID, ContinuousRoutineStartResultMessage> res = new 
HashMap<>();
+        private final 
DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage, 
RoutineRegisterResults>
+            resCollect;
 
         /**
          * @param routineId Consume ID.
          */
         StartFuture(UUID routineId) {
             this.routineId = routineId;
+
+            resCollect = new 
DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage, 
RoutineRegisterResults>(ctx) {
+                @Override protected RoutineRegisterResults 
createResult(Map<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> rcvd) {
+                    Map<UUID, Exception> errs = null;
+                    Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = 
null;
+
+                    for (Map.Entry<UUID, 
NodeMessage<ContinuousRoutineStartResultMessage>> entry : rcvd.entrySet()) {
+                        ContinuousRoutineStartResultMessage msg = 
entry.getValue().message();
+
+                        if (msg == null)
+                            continue;
+
+                        if (msg.error()) {
+                            byte[] errBytes = msg.errorBytes();
+
+                            Exception err = null;
+
+                            if (errBytes != null) {
+                                try {
+                                    err = U.unmarshal(marsh, errBytes, 
U.resolveClassLoader(ctx.config()));
+                                }
+                                catch (Exception e) {
+                                    U.warn(log, "Failed to unmarhal continuous 
routine start error: " + e);
+                                }
+                            }
+
+                            if (err == null) {
+                                err = new IgniteCheckedException("Failed to 
start continuous " +
+                                    "routine on node: " + entry.getKey());
+                            }
+
+                            if (errs == null)
+                                errs = new HashMap<>();
+
+                            errs.put(entry.getKey(), err);
+                        }
+                        else {
+                            byte[] cntrsMapBytes = msg.countersMapBytes();
+
+                            if (cntrsMapBytes != null) {
+                                try {
+                                    CachePartitionPartialCountersMap cntrsMap 
= U.unmarshal(
+                                        marsh,
+                                        cntrsMapBytes,
+                                        U.resolveClassLoader(ctx.config()));
+
+                                    if (cntrsPerNode == null)
+                                        cntrsPerNode = new HashMap<>();
+
+                                    cntrsPerNode.put(entry.getKey(), 
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+                                }
+                                catch (Exception e) {
+                                    U.warn(log, "Failed to unmarhal continuous 
query update counters: " + e);
+                                }
+                            }
+                        }
+                    }
+
+                    return new RoutineRegisterResults(discoCache.version(), 
errs, cntrsPerNode);
+                }
+
+                @Override protected void 
onResultsCollected(RoutineRegisterResults res0) {
+                    onAllRemoteRegistered(res0.topVer, res0.errs, 
res0.cntrsPerNode, null);
+                }
+
+                @Override protected boolean waitForNode(DiscoCache discoCache, 
ClusterNode node) {
+                    return !ctx.localNodeId().equals(node.id());
+                }
+            };
         }
 
         /**
@@ -2400,28 +2465,10 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         }
 
         /**
-         * @param topVer Topology version.
-         * @param nodes Nodes.
+         * @param discoCache Discovery state.
          */
-        void initRemoteNodes(AffinityTopologyVersion topVer, 
Collection<ClusterNode> nodes) {
-            RoutineRegisterResults res0 = null;
-
-            synchronized (res) {
-                assert this.topVer == null && expRes == 0;
-
-                this.topVer = topVer;
-
-                for (ClusterNode node : nodes) {
-                    if (!ctx.localNodeId().equals(node.id()) && 
ctx.discovery().alive(node.id()))
-                        expRes++;
-                }
-
-                if (res.size() >= expRes)
-                    res0 = createRegisterResults();
-            }
-
-            if (res0 != null)
-                onAllRemoteRegistered(res0.topVer, res0.errs, 
res0.cntrsPerNode, null);
+        void initRemoteNodes(DiscoCache discoCache) {
+            resCollect.init(discoCache);
         }
 
         /**
@@ -2429,27 +2476,14 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
          * @param msg Message.
          */
         void onResult(UUID nodeId, ContinuousRoutineStartResultMessage msg) {
-            RoutineRegisterResults res0 = null;
-
-            synchronized (res) {
-                if (res.containsKey(nodeId) || (topVer != null && res.size() 
>= expRes))
-                    return;
-
-                res.put(nodeId, msg);
-
-                if (topVer != null && res.size() >= expRes)
-                    res0 = createRegisterResults();
-            }
-
-            if (res0 != null)
-                onAllRemoteRegistered(res0.topVer, res0.errs, 
res0.cntrsPerNode, null);
+            resCollect.onMessage(nodeId, msg);
         }
 
         /**
          * @param nodeId Failed node ID.
          */
         void onNodeFail(UUID nodeId) {
-            onResult(nodeId, new 
ContinuousRoutineStartResultMessage(routineId, null, null, false));
+            resCollect.onNodeFail(nodeId);
         }
 
         /**
@@ -2472,65 +2506,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 onDone(routineId);
         }
 
-        /**
-         * @return Results.
-         */
-        private RoutineRegisterResults createRegisterResults() {
-            Map<UUID, Exception> errs = null;
-            Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = null;
-
-            for (Map.Entry<UUID, ContinuousRoutineStartResultMessage> entry : 
res.entrySet()) {
-                ContinuousRoutineStartResultMessage msg = entry.getValue();
-
-                if (msg.error()) {
-                    byte[] errBytes = msg.errorBytes();
-
-                    Exception err = null;
-
-                    if (errBytes != null) {
-                        try {
-                            err = U.unmarshal(marsh, errBytes, 
U.resolveClassLoader(ctx.config()));
-                        }
-                        catch (Exception e) {
-                            U.warn(log, "Failed to unmarhal continuous routine 
start error: " + e);
-                        }
-                    }
-
-                    if (err == null) {
-                        err = new IgniteCheckedException("Failed to start 
continuous " +
-                            "routine on node: " + entry.getKey());
-                    }
-
-                    if (errs == null)
-                        errs = new HashMap<>();
-
-                    errs.put(entry.getKey(), err);
-                }
-                else {
-                    byte[] cntrsMapBytes = msg.countersMapBytes();
-
-                    if (cntrsMapBytes != null) {
-                        try {
-                            CachePartitionPartialCountersMap cntrsMap = 
U.unmarshal(
-                                marsh,
-                                cntrsMapBytes,
-                                U.resolveClassLoader(ctx.config()));
-
-                            if (cntrsPerNode == null)
-                                cntrsPerNode = new HashMap<>();
-
-                            cntrsPerNode.put(entry.getKey(), 
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
-                        }
-                        catch (Exception e) {
-                            U.warn(log, "Failed to unmarhal continuous query 
update counters: " + e);
-                        }
-                    }
-                }
-            }
-
-            return new RoutineRegisterResults(topVer, errs, cntrsPerNode);
-        }
-
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(StartFuture.class, this);
@@ -2568,9 +2543,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * Future for stop routine.
      */
     private static class StopFuture extends GridFutureAdapter<Object> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
         /** Timeout object. */
         private volatile GridTimeoutObject timeoutObj;
 
@@ -2614,9 +2586,6 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      */
     private static class SyncMessageAckFuture extends 
GridFutureAdapter<Object> {
         /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
         private UUID nodeId;
 
         /**

Reply via email to