zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e8f85ff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e8f85ff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e8f85ff

Branch: refs/heads/ignite-zk
Commit: 7e8f85ff81f1cfc0f163f86d623f7e558177aa2c
Parents: 376a484
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Dec 14 11:55:56 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Dec 14 13:56:54 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZkAbstractChildrenCallback.java |   8 +
 .../zk/internal/ZkCollectDistributedFuture.java | 171 ---------
 .../ZkCommunicationErrorProcessFuture.java      | 114 ++++--
 ...kCommunicationErrorResolveFinishMessage.java |  23 +-
 .../ZkCommunicationErrorResolveResult.java      |  37 ++
 ...ZkCommunicationErrorResolveStartMessage.java |   6 +
 .../zk/internal/ZkDiscoveryCustomEventData.java |  19 +-
 .../ZkDistributedCollectDataFuture.java         | 223 ++++++++++++
 .../discovery/zk/internal/ZkIgnitePaths.java    |  20 +-
 .../discovery/zk/internal/ZookeeperClient.java  |   5 +
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 360 +++++++++++--------
 .../ZookeeperDiscoverySpiBasicTest.java         | 176 ++++++++-
 12 files changed, 808 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
index 5679993..2292e35 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
@@ -48,6 +48,14 @@ abstract class ZkAbstractChildrenCallback extends 
ZkAbstractCallabck implements
         }
     }
 
+    /**
+     * @param rc
+     * @param path
+     * @param ctx
+     * @param children
+     * @param stat
+     * @throws Exception If failed.
+     */
     abstract void processResult0(int rc, String path, Object ctx, List<String> 
children, Stat stat)
         throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java
deleted file mode 100644
index fa529cf..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-class ZkCollectDistributedFuture extends GridFutureAdapter<Void> {
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final String futPath;
-
-    /** */
-    private final ZookeeperDiscoveryImpl impl;
-
-    /** */
-    private final Set<Long> remainingNodes;
-
-    /** */
-    private final Callable<Void> lsnr;
-
-    /**
-     * @param impl
-     * @param rtState
-     * @param futPath
-     */
-    ZkCollectDistributedFuture(ZookeeperDiscoveryImpl impl, ZkRuntimeState 
rtState, String futPath, Callable<Void> lsnr) throws Exception {
-        this.impl = impl;
-        this.log = impl.log();
-        this.futPath = futPath;
-        this.lsnr = lsnr;
-
-        ZkClusterNodes top = impl.nodes();
-
-        remainingNodes = U.newHashSet(top.nodesByOrder.size());
-
-        for (ZookeeperClusterNode node : top.nodesByInternalId.values())
-            remainingNodes.add(node.order());
-
-        NodeResultsWatcher watcher = new NodeResultsWatcher(rtState, impl);
-
-        if (remainingNodes.isEmpty())
-            completeAndNotifyListener();
-        else
-            rtState.zkClient.getChildrenAsync(futPath, watcher, watcher);
-    }
-
-    /**
-     * @throws Exception If listener call failed.
-     */
-    private void completeAndNotifyListener() throws Exception {
-        if (super.onDone())
-            lsnr.call();
-    }
-
-    /**
-     * @param futPath
-     * @param client
-     * @param nodeOrder
-     * @param data
-     * @throws Exception If failed.
-     */
-    static void saveNodeResult(String futPath, ZookeeperClient client, long 
nodeOrder, byte[] data) throws Exception {
-        client.createIfNeeded(futPath + "/" + nodeOrder, data, 
CreateMode.PERSISTENT);
-    }
-
-    /**
-     * @param node Failed node.
-     */
-    void onNodeFail(ZookeeperClusterNode node) throws Exception {
-        long nodeOrder = node.order();
-
-        if (remainingNodes.remove(nodeOrder)) {
-            int remaining = remainingNodes.size();
-
-            if (log.isInfoEnabled()) {
-                log.info("ZkCollectDistributedFuture removed remaining failed 
node [node=" + nodeOrder +
-                    ", remaining=" + remaining +
-                    ", futPath=" + futPath + ']');
-            }
-
-            if (remaining == 0)
-                completeAndNotifyListener();
-        }
-    }
-
-    /**
-     *
-     */
-    class NodeResultsWatcher extends ZkAbstractWatcher implements 
AsyncCallback.Children2Callback {
-        /**
-         * @param rtState Runtime state.
-         * @param impl Discovery impl.
-         */
-        NodeResultsWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl 
impl) {
-            super(rtState, impl);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void process0(WatchedEvent evt) {
-            if (evt.getType() == Watcher.Event.EventType.NodeChildrenChanged)
-                impl.zkClient().getChildrenAsync(evt.getPath(), this, this);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
-            if (!onProcessStart())
-                return;
-
-            try {
-                assert rc == 0 : KeeperException.Code.get(rc);
-
-                if (isDone())
-                    return;
-
-                for (int i = 0; i < children.size(); i++) {
-                    Long nodeOrder = Long.parseLong(children.get(i));
-
-                    if (remainingNodes.remove(nodeOrder)) {
-                        int remaining = remainingNodes.size();
-
-                        if (log.isInfoEnabled()) {
-                            log.info("ZkCollectDistributedFuture added new 
result [node=" + nodeOrder +
-                                ", remaining=" + remaining +
-                                ", futPath=" + path + ']');
-                        }
-
-                        if (remaining == 0)
-                            completeAndNotifyListener();
-                    }
-                }
-
-                onProcessEnd();
-            }
-            catch (Throwable e) {
-                onProcessError(e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index d87f500..15744a2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -18,11 +18,10 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -31,6 +30,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -41,7 +41,10 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter<Void> implemen
     private final ZookeeperDiscoveryImpl impl;
 
     /** */
-    private final Map<Long, GridFutureAdapter<Boolean>> nodeFuts = new 
HashMap<>();
+    private final IgniteLogger log;
+
+    /** */
+    private final Map<Long, GridFutureAdapter<Boolean>> nodeFuts = new 
ConcurrentHashMap<>();
 
     /** */
     private final long endTime;
@@ -59,7 +62,10 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter<Void> implemen
     private Set<Long> resFailedNodes;
 
     /** */
-    private ZkCollectDistributedFuture nodeResFut;
+    private Exception resErr;
+
+    /** */
+    private ZkDistributedCollectDataFuture collectResFut;
 
     /**
      * @param impl Discovery impl.
@@ -87,6 +93,7 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter<Void> implemen
         assert state != State.DONE;
 
         this.impl = impl;
+        this.log = impl.log();
 
         if (state == State.WAIT_TIMEOUT) {
             assert timeout > 0 : timeout;
@@ -102,15 +109,44 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter<Void> implemen
         this.state = state;
     }
 
-    void nodeResultCollectFuture(ZkCollectDistributedFuture nodeResFut) {
-        assert nodeResFut == null : nodeResFut;
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteLogger logger() {
+        return log;
+    }
 
-        this.nodeResFut = nodeResFut;
+    /**
+     * @param collectResFut Collect nodes' communication status future.
+     */
+    void nodeResultCollectFuture(ZkDistributedCollectDataFuture collectResFut) 
{
+        assert this.collectResFut == null : collectResFut;
+
+        this.collectResFut = collectResFut;
     }
 
+    /**
+     * @param top Topology.
+     * @throws Exception If failed.
+     */
+    void onTopologyChange(ZkClusterNodes top) throws Exception {
+        for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : 
nodeFuts.entrySet()) {
+            if (!top.nodesByOrder.containsKey(e.getKey()))
+                e.getValue().onDone(false);
+        }
+
+        if (collectResFut != null)
+            collectResFut.onTopologyChange(top);
+    }
+
+    /**
+     * @param locNodeOrder Local node order.
+     * @param rtState Runtime state.
+     * @param futPath Future path.
+     * @param nodes Nodes to ping.
+     * @throws Exception If failed.
+     */
     void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, 
String futPath, Collection<ClusterNode> nodes)
         throws Exception {
-        ZkCollectDistributedFuture.saveNodeResult(futPath, rtState.zkClient, 
locNodeOrder, null);
+        ZkDistributedCollectDataFuture.saveNodeResult(futPath, 
rtState.zkClient, locNodeOrder, null);
     }
 
     /**
@@ -146,6 +182,37 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter<Void> implemen
     }
 
     /**
+     * @param failedNodes Node failed as result of resolve process.
+     */
+    void onFinishResolve(Set<Long> failedNodes) {
+        Map<Long, GridFutureAdapter<Boolean>> futs;
+
+        synchronized (this) {
+            if (state == State.DONE) {
+                assert resErr != null;
+
+                return;
+            }
+
+            assert state == State.RESOLVE_STARTED : state;
+
+            state = State.DONE;
+
+            resFailedNodes = failedNodes;
+
+            futs = nodeFuts; // nodeFuts should not be modified after state 
changed to DONE.
+        }
+
+        for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : futs.entrySet()) {
+            Boolean res = !F.contains(resFailedNodes, e.getKey());
+
+            e.getValue().onDone(res);
+        }
+
+        onDone();
+    }
+
+    /**
      * @param node Node.
      * @return Future finished when communication error resolve is done or 
{@code null} if another
      *      resolve process should be started.
@@ -176,27 +243,19 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter<Void> implemen
         return fut;
     }
 
-    /**
-     * @param node Failed node.
-     */
-    void onNodeFailed(ClusterNode node) {
-        GridFutureAdapter<Boolean> fut = null;
-
-        synchronized (this) {
-            if (state == State.WAIT_TIMEOUT)
-                fut = nodeFuts.get(node.order());
-        }
-
-        if (fut != null)
-            fut.onDone(false);
-    }
-
     /** {@inheritDoc} */
     @Override public void run() {
         // Run from zk discovery worker pool after timeout.
-        if (processTimeout()) {
+        if (needProcessTimeout()) {
             try {
-                impl.sendCustomMessage(new 
ZkCommunicationErrorResolveStartMessage(UUID.randomUUID()));
+                UUID reqId = UUID.randomUUID();
+
+                if (log.isInfoEnabled()) {
+                    log.info("Initiate cluster-wide communication error 
resolve process [reqId=" + reqId +
+                        ", errNodes=" + nodeFuts.size() + ']');
+                }
+
+                impl.sendCustomMessage(new 
ZkCommunicationErrorResolveStartMessage(reqId));
             }
             catch (Exception e) {
                 Collection<GridFutureAdapter<Boolean>> futs;
@@ -206,6 +265,7 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter<Void> implemen
                         return;
 
                     state = State.DONE;
+                    resErr = e;
 
                     futs = nodeFuts.values(); // nodeFuts should not be 
modified after state changed to DONE.
                 }
@@ -221,7 +281,7 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter<Void> implemen
     /**
      * @return {@code True} if need initiate resolve process after timeout 
expired.
      */
-    private boolean processTimeout() {
+    private boolean needProcessTimeout() {
         synchronized (this) {
             if (state != State.WAIT_TIMEOUT)
                 return false;
@@ -251,7 +311,7 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter<Void> implemen
 
     /** {@inheritDoc} */
     @Override public void onTimeout() {
-        if (processTimeout())
+        if (needProcessTimeout())
             impl.runInWorkerThread(this);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
index 144a5bf..20aeddf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
@@ -18,21 +18,42 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
-class ZkCommunicationErrorResolveFinishMessage implements ZkInternalMessage {
+class ZkCommunicationErrorResolveFinishMessage implements 
DiscoverySpiCustomMessage, ZkInternalMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** */
     final UUID futId;
 
+    /** */
+    transient ZkCommunicationErrorResolveResult res;
+
     /**
      * @param futId Future ID.
      */
     ZkCommunicationErrorResolveFinishMessage(UUID futId) {
         this.futId = futId;
     }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkCommunicationErrorResolveFinishMessage.class, 
this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
new file mode 100644
index 0000000..745496b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.GridLongList;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkCommunicationErrorResolveResult implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final GridLongList failedNodes;
+
+    ZkCommunicationErrorResolveResult(@Nullable GridLongList failedNodes) {
+        this.failedNodes = failedNodes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
index e619d7b..e85277b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.jetbrains.annotations.Nullable;
 
@@ -47,4 +48,9 @@ public class ZkCommunicationErrorResolveStartMessage 
implements DiscoverySpiCust
     @Override public boolean isMutable() {
         return false;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkCommunicationErrorResolveStartMessage.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
index 0d2288c..6375bc7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java
@@ -38,22 +38,33 @@ class ZkDiscoveryCustomEventData extends 
ZkDiscoveryEventData {
     /** */
     final String evtPath;
 
-    /** */
-    transient DiscoverySpiCustomMessage msg;
+    /** Message instance (can be marshalled as part of 
ZkDiscoveryCustomEventData or stored in separate znode. */
+    DiscoverySpiCustomMessage msg;
+
+    /** Unmarshalled message. */
+    transient DiscoverySpiCustomMessage resolvedMsg;
 
     /**
      * @param evtId Event ID.
      * @param topVer Topology version.
      * @param sndNodeId Sender node ID.
+     * @param msg Message instance.
      * @param evtPath Event path.
      * @param ack Acknowledge event flag.
      */
-    ZkDiscoveryCustomEventData(long evtId, long topVer, UUID sndNodeId, String 
evtPath, boolean ack) {
+    ZkDiscoveryCustomEventData(long evtId,
+        long topVer,
+        UUID sndNodeId,
+        DiscoverySpiCustomMessage msg,
+        String evtPath,
+        boolean ack)
+    {
         super(evtId, DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, topVer);
 
         assert sndNodeId != null;
-        assert ack || !F.isEmpty(evtPath);
+        assert msg != null || ack || !F.isEmpty(evtPath);
 
+        this.msg = msg;
         this.sndNodeId = sndNodeId;
         this.evtPath = evtPath;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
new file mode 100644
index 0000000..d33001b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ *
+ */
+class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> {
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final String futPath;
+
+    /** */
+    private final Set<Long> remainingNodes;
+
+    /** */
+    private final Callable<Void> lsnr;
+
+    /**
+     * @param impl Disovery impl
+     * @param rtState Runtime state.
+     * @param futPath Future path.
+     * @param lsnr Future listener.
+     * @throws Exception If listener call failed.
+     */
+    ZkDistributedCollectDataFuture(
+        ZookeeperDiscoveryImpl impl,
+        ZkRuntimeState rtState,
+        String futPath,
+        Callable<Void> lsnr)
+        throws Exception
+    {
+        this.log = impl.log();
+        this.futPath = futPath;
+        this.lsnr = lsnr;
+
+        ZkClusterNodes top = rtState.top;
+
+        remainingNodes = U.newHashSet(top.nodesByOrder.size());
+
+        for (ZookeeperClusterNode node : top.nodesByInternalId.values())
+            remainingNodes.add(node.order());
+
+        NodeResultsWatcher watcher = new NodeResultsWatcher(rtState, impl);
+
+        if (remainingNodes.isEmpty())
+            completeAndNotifyListener();
+        else {
+            if (log.isInfoEnabled())
+                log.info("Initialize data collect future [futPath=" + futPath 
+ ", nodes=" + remainingNodes.size() + ']');
+
+            rtState.zkClient.getChildrenAsync(futPath, watcher, watcher);
+        }
+    }
+
+    /**
+     * @throws Exception If listener call failed.
+     */
+    private void completeAndNotifyListener() throws Exception {
+        if (super.onDone())
+            lsnr.call();
+    }
+
+    /**
+     * @param futPath
+     * @param client
+     * @param nodeOrder
+     * @param data
+     * @throws Exception If failed.
+     */
+    static void saveNodeResult(String futPath, ZookeeperClient client, long 
nodeOrder, byte[] data) throws Exception {
+        client.createIfNeeded(futPath + "/" + nodeOrder, data, 
CreateMode.PERSISTENT);
+    }
+
+    /**
+     * @param futResPath Result path.
+     * @param client Client.
+     * @param data Result data.
+     * @throws Exception If failed.
+     */
+    static void saveResult(String futResPath, ZookeeperClient client, byte[] 
data) throws Exception {
+        client.createIfNeeded(futResPath, data, CreateMode.PERSISTENT);
+    }
+
+    static byte[] readResult(ZookeeperClient client, ZkIgnitePaths paths, UUID 
futId) throws Exception {
+        return client.getData(paths.distributedFutureResultPath(futId));
+    }
+
+    /**
+     * @param client Client.
+     * @param paths Paths utils.
+     * @param futId Future ID.
+     * @throws Exception If failed.
+     */
+    static void deleteFutureData(ZookeeperClient client, ZkIgnitePaths paths, 
UUID futId) throws Exception {
+        // TODO ZK: use multi, better batching + max-size safe + 
NoNodeException safe.
+        String evtDir = paths.distributedFutureBasePath(futId);
+
+        client.deleteAll(evtDir,
+            client.getChildren(evtDir),
+            -1);
+
+        client.deleteIfExists(evtDir, -1);
+
+        client.deleteIfExists(paths.distributedFutureResultPath(futId), -1);
+    }
+
+    /**
+     * @param top Current topology.
+     * @throws Exception If listener call failed.
+     */
+    void onTopologyChange(ZkClusterNodes top) throws Exception {
+        if (remainingNodes.isEmpty())
+            return;
+
+        for (Iterator<Long> it = remainingNodes.iterator(); it.hasNext();) {
+            Long nodeOrder = it.next();
+
+            if (!top.nodesByOrder.containsKey(nodeOrder)) {
+                it.remove();
+
+                int remaining = remainingNodes.size();
+
+                if (log.isInfoEnabled()) {
+                    log.info("ZkDistributedCollectDataFuture removed remaining 
failed node [node=" + nodeOrder +
+                        ", remaining=" + remaining +
+                        ", futPath=" + futPath + ']');
+                }
+
+                if (remaining == 0) {
+                    completeAndNotifyListener();
+
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    class NodeResultsWatcher extends ZkAbstractWatcher implements 
AsyncCallback.Children2Callback {
+        /**
+         * @param rtState Runtime state.
+         * @param impl Discovery impl.
+         */
+        NodeResultsWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl 
impl) {
+            super(rtState, impl);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void process0(WatchedEvent evt) {
+            if (evt.getType() == Watcher.Event.EventType.NodeChildrenChanged)
+                rtState.zkClient.getChildrenAsync(evt.getPath(), this, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+            if (!onProcessStart())
+                return;
+
+            try {
+                if (!isDone()) {
+                    assert rc == 0 : KeeperException.Code.get(rc);
+
+                    for (int i = 0; i < children.size(); i++) {
+                        Long nodeOrder = Long.parseLong(children.get(i));
+
+                        if (remainingNodes.remove(nodeOrder)) {
+                            int remaining = remainingNodes.size();
+
+                            if (log.isInfoEnabled()) {
+                                log.info("ZkDistributedCollectDataFuture added 
new result [node=" + nodeOrder +
+                                    ", remaining=" + remaining +
+                                    ", futPath=" + path + ']');
+                            }
+
+                            if (remaining == 0)
+                                completeAndNotifyListener();
+                        }
+                    }
+                }
+
+                onProcessEnd();
+            }
+            catch (Throwable e) {
+                onProcessError(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 06c5d9e..2a1d804 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -26,22 +26,22 @@ class ZkIgnitePaths {
     /** */
     private static final int UUID_LEN = 36;
 
-    /** */
+    /** Directory to store joined node data. */
     private static final String JOIN_DATA_DIR = "jd";
 
-    /** */
+    /** Directory to store new custom events. */
     private static final String CUSTOM_EVTS_DIR = "ce";
 
-    /** */
+    /** Directory to store parts of multi-parts custom events. */
     private static final String CUSTOM_EVTS_PARTS_DIR = "cp";
 
-    /** */
+    /** Directory to store acknowledge messages for custom events. */
     private static final String CUSTOM_EVTS_ACKS_DIR = "ca";
 
-    /** */
+    /** Directory to store EPHEMERAL znodes for alive cluster nodes. */
     static final String ALIVE_NODES_DIR = "n";
 
-    /** */
+    /** Path to store discovery events {@link ZkDiscoveryEventsData}. */
     private static final String DISCO_EVENTS_PATH = "e";
 
     /** */
@@ -300,4 +300,12 @@ class ZkIgnitePaths {
     String distributedFutureBasePath(UUID id) {
         return evtsPath + "/f-" + id;
     }
+
+    /**
+     * @param id Future ID.
+     * @return Future path.
+     */
+    String distributedFutureResultPath(UUID id) {
+        return evtsPath + "/fr-" + id;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index a83886a..5923b39 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -321,6 +321,11 @@ public class ZookeeperClient implements Watcher {
         return parts;
     }
 
+    /**
+     * TODO ZK: it seems not always precise, e.g. if ACL is used?
+     * @param path Request path.
+     * @return Marshalled request overhead.
+     */
     private int requestOverhead(String path) {
         return path.length();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index a153d11..82d9c4b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -54,7 +54,6 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.MarshallerUtils;
@@ -72,7 +71,6 @@ import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.jboss.netty.util.internal.ConcurrentHashMap;
 import org.jetbrains.annotations.Nullable;
@@ -151,7 +149,10 @@ public class ZookeeperDiscoveryImpl {
     public volatile IgniteDiscoverySpiInternalListener internalLsnr;
 
     /** */
-    private final ConcurrentHashMap<UUID, PingFuture> pingFuts = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Long, PingFuture> pingFuts = new 
ConcurrentHashMap<>();
+
+    /** */
+    private final AtomicReference<ZkCommunicationErrorProcessFuture> 
commErrProcFut = new AtomicReference<>();
 
     /**
      * @param spi Discovery SPI.
@@ -230,9 +231,9 @@ public class ZookeeperDiscoveryImpl {
         return rtState.top.nodesByOrder.get(nodeOrder);
     }
 
-    /** */
-    private final AtomicReference<ZkCommunicationErrorProcessFuture> 
commErrProcFut = new AtomicReference<>();
-
+    /**
+     * @param fut Future to remove.
+     */
     void 
clearCommunicationErrorProcessFuture(ZkCommunicationErrorProcessFuture fut) {
         assert fut.isDone() : fut;
 
@@ -262,6 +263,11 @@ public class ZookeeperDiscoveryImpl {
                 if (commErrProcFut.compareAndSet(fut, newFut)) {
                     fut = newFut;
 
+                    if (log.isInfoEnabled()) {
+                        log.info("Created new communication error process 
future [errNode=" + node0.id() +
+                            ", err= " + err + ']');
+                    }
+
                     fut.scheduleCheckOnTimeout();
                 }
                 else
@@ -295,7 +301,9 @@ public class ZookeeperDiscoveryImpl {
      * @return Ping result.
      */
     public boolean pingNode(UUID nodeId) {
-        ZookeeperClusterNode node = node(nodeId);
+        ZkRuntimeState rtState = this.rtState;
+
+        ZookeeperClusterNode node = rtState.top.nodesById.get(nodeId);
 
         if (node == null)
             return false;
@@ -303,12 +311,12 @@ public class ZookeeperDiscoveryImpl {
         if (node.isLocal())
             return true;
 
-        PingFuture fut = pingFuts.get(nodeId);
+        PingFuture fut = pingFuts.get(node.order());
 
         if (fut == null) {
-            fut = new PingFuture(node);
+            fut = new PingFuture(rtState, node);
 
-            PingFuture old = pingFuts.putIfAbsent(nodeId, fut);
+            PingFuture old = pingFuts.putIfAbsent(node.order(), fut);
 
             if (old == null) {
                 if (fut.checkNodeAndState())
@@ -419,7 +427,7 @@ public class ZookeeperDiscoveryImpl {
                 connState = ConnectionState.STOPPED;
             }
 
-            zkClient().zk().sync(zkPaths.clusterDir, new SegmentedWatcher(), 
null);
+            rtState.zkClient.zk().sync(zkPaths.clusterDir, new 
SegmentedWatcher(), null);
         }
         else
             joinFut.onDone(e);
@@ -530,33 +538,7 @@ public class ZookeeperDiscoveryImpl {
         try {
             ZookeeperClient zkClient = rtState.zkClient;
 
-            String prefix = UUID.randomUUID().toString();
-
-            int partCnt = 1;
-
-            int overhead = 10;
-
-            UUID locId = locNode.id();
-
-            String path = zkPaths.createCustomEventPath(prefix, locId, 
partCnt);
-
-            if (zkClient.needSplitNodeData(path, msgBytes, overhead)) {
-                List<byte[]> parts = zkClient.splitNodeData(path, msgBytes, 
overhead);
-
-                String partsBasePath = 
zkPaths.customEventPartsBasePath(prefix, locId);
-
-                saveMultipleParts(zkClient, partsBasePath, parts);
-
-                msgBytes = null;
-
-                partCnt = parts.size();
-            }
-
-            zkClient.createSequential(prefix,
-                zkPaths.customEvtsDir,
-                zkPaths.createCustomEventPath(prefix, locId, partCnt),
-                msgBytes,
-                CreateMode.PERSISTENT_SEQUENTIAL);
+            saveCustomMessage(zkClient, msgBytes);
         }
         catch (ZookeeperClientFailedException e) {
             if (clientReconnectEnabled)
@@ -575,6 +557,44 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param zkClient Client.
+     * @param msgBytes Marshalled message.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    private void saveCustomMessage(ZookeeperClient zkClient, byte[] msgBytes)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        String prefix = UUID.randomUUID().toString();
+
+        int partCnt = 1;
+
+        int overhead = 10;
+
+        UUID locId = locNode.id();
+
+        String path = zkPaths.createCustomEventPath(prefix, locId, partCnt);
+
+        if (zkClient.needSplitNodeData(path, msgBytes, overhead)) {
+            List<byte[]> parts = zkClient.splitNodeData(path, msgBytes, 
overhead);
+
+            String partsBasePath = zkPaths.customEventPartsBasePath(prefix, 
locId);
+
+            saveMultipleParts(zkClient, partsBasePath, parts);
+
+            msgBytes = null;
+
+            partCnt = parts.size();
+        }
+
+        zkClient.createSequential(prefix,
+            zkPaths.customEvtsDir,
+            zkPaths.createCustomEventPath(prefix, locId, partCnt),
+            msgBytes,
+            CreateMode.PERSISTENT_SEQUENTIAL);
+    }
+
+    /**
      * @return Cluster start time.
      */
     public long gridStartTime() {
@@ -859,7 +879,7 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    private class CheckJoinStateTimeoutObject implements 
IgniteSpiTimeoutObject, Watcher, AsyncCallback.DataCallback {
+    private class CheckJoinStateTimeoutObject extends ZkAbstractWatcher 
implements IgniteSpiTimeoutObject, AsyncCallback.DataCallback {
         /** */
         private final IgniteUuid id = IgniteUuid.randomUuid();
 
@@ -869,16 +889,14 @@ public class ZookeeperDiscoveryImpl {
         /** */
         private final String joinDataPath;
 
-        /** */
-        private final ZkRuntimeState rtState;
-
         /**
          * @param joinDataPath Node joined data path.
          * @param rtState State.
          */
         CheckJoinStateTimeoutObject(String joinDataPath, ZkRuntimeState 
rtState) {
+            super(rtState, ZookeeperDiscoveryImpl.this);
+
             this.joinDataPath = joinDataPath;
-            this.rtState = rtState;
         }
 
         /** {@inheritDoc} */
@@ -909,7 +927,7 @@ public class ZookeeperDiscoveryImpl {
             if (rc != 0)
                 return;
 
-            if (!busyLock.enterBusy())
+            if (!onProcessStart())
                 return;
 
             try {
@@ -921,27 +939,17 @@ public class ZookeeperDiscoveryImpl {
                     onSegmented(new IgniteSpiException(joinErr.err));
                 }
 
-                busyLock.leaveBusy();
+                onProcessEnd();
             }
             catch (Throwable e) {
-                onFatalError(busyLock, e);
+                onProcessError(e);
             }
         }
 
         /** {@inheritDoc} */
-        @Override public void process(WatchedEvent evt) {
-            if (!busyLock.enterBusy())
-                return;
-
-            try {
-                if (evt.getType() == Event.EventType.NodeDataChanged)
-                    rtState.zkClient.getDataAsync(evt.getPath(), this, this);
-
-                busyLock.leaveBusy();
-            }
-            catch (Throwable e) {
-                onFatalError(busyLock, e);
-            }
+        @Override public void process0(WatchedEvent evt) {
+            if (evt.getType() == Event.EventType.NodeDataChanged)
+                rtState.zkClient.getDataAsync(evt.getPath(), this, this);
         }
     }
 
@@ -1243,18 +1251,20 @@ public class ZookeeperDiscoveryImpl {
             return true;
         }
         else {
+            ZookeeperClient client = rtState.zkClient;
+
             if (joinErr.notifyNode) {
                 String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, 
prefixId);
 
-                zkClient().setData(joinDataPath, marshalZip(joinErr), -1);
+                client.setData(joinDataPath, marshalZip(joinErr), -1);
 
-                zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + 
aliveNodePath, -1);
+                client.deleteIfExists(zkPaths.aliveNodesDir + "/" + 
aliveNodePath, -1);
             }
             else {
                 if (log.isInfoEnabled())
                     log.info("Ignore join data, node was failed by previous 
coordinator: " + aliveNodePath);
 
-                zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + 
aliveNodePath, -1);
+                client.deleteIfExists(zkPaths.aliveNodesDir + "/" + 
aliveNodePath, -1);
             }
 
             return false;
@@ -1483,8 +1493,10 @@ public class ZookeeperDiscoveryImpl {
     private void cleanupPreviousClusterData() throws Exception {
         long start = System.currentTimeMillis();
 
-        // TODO ZK: use multi, better batching.
-        rtState.zkClient.setData(zkPaths.evtsPath, null, -1);
+        ZookeeperClient client = rtState.zkClient;
+
+        // TODO ZK: use multi, better batching + max-size safe + 
NoNodeException safe.
+        client.setData(zkPaths.evtsPath, null, -1);
 
         List<String> evtChildren = 
rtState.zkClient.getChildren(zkPaths.evtsPath);
 
@@ -1494,10 +1506,10 @@ public class ZookeeperDiscoveryImpl {
             removeChildren(evtDir);
         }
 
-        rtState.zkClient.deleteAll(zkPaths.evtsPath, evtChildren, -1);
+        client.deleteAll(zkPaths.evtsPath, evtChildren, -1);
 
-        rtState.zkClient.deleteAll(zkPaths.customEvtsDir,
-            rtState.zkClient.getChildren(zkPaths.customEvtsDir),
+        client.deleteAll(zkPaths.customEvtsDir,
+            client.getChildren(zkPaths.customEvtsDir),
             -1);
 
         rtState.zkClient.deleteAll(zkPaths.customEvtsPartsDir,
@@ -1525,20 +1537,6 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
-     * @return Nodes.
-     */
-    ZkClusterNodes nodes() {
-        return rtState.top;
-    }
-
-    /**
-     * @return Client.
-     */
-    ZookeeperClient zkClient() {
-        return rtState.zkClient;
-    }
-
-    /**
      * @param zkClient Client.
      * @param evtPath Event path.
      * @param sndNodeId Sender node ID.
@@ -1654,7 +1652,8 @@ public class ZookeeperDiscoveryImpl {
                             }
                             else {
                                 if (log.isInfoEnabled()) {
-                                    log.info("Start communication error 
resolve [sndNode=" + sndNode +
+                                    log.info("Start cluster-wide communication 
error resolve [sndNode=" + sndNode +
+                                        ", reqId=" + msg0.id +
                                         ", topVer=" + evtsData.topVer + ']');
                                 }
 
@@ -1672,10 +1671,11 @@ public class ZookeeperDiscoveryImpl {
                             evtsData.evtIdGen,
                             evtsData.topVer,
                             sndNodeId,
+                            null,
                             evtPath,
                             false);
 
-                        evtData.msg = msg;
+                        evtData.resolvedMsg = msg;
 
                         evtsData.addEvent(rtState.top.nodesByOrder.values(), 
evtData);
 
@@ -1746,7 +1746,7 @@ public class ZookeeperDiscoveryImpl {
                         
(ZkDiscoveryCustomEventData)newEvts.evts.get(evtData.eventId());
 
                     if (evtData0 != null)
-                        evtData0.msg = 
((ZkDiscoveryCustomEventData)evtData).msg;
+                        evtData0.resolvedMsg = 
((ZkDiscoveryCustomEventData)evtData).resolvedMsg;
                 }
             }
         }
@@ -1831,25 +1831,31 @@ public class ZookeeperDiscoveryImpl {
                         DiscoverySpiCustomMessage msg;
 
                         if (rtState.crd) {
-                            assert evtData0.msg != null : evtData0;
+                            assert evtData0.resolvedMsg != null : evtData0;
 
-                            msg = evtData0.msg;
+                            msg = evtData0.resolvedMsg;
                         }
                         else {
-                            if (evtData0.ackEvent()) {
-                                String path = 
zkPaths.ackEventDataPath(evtData0.eventId());
+                            if (evtData0.msg == null) {
+                                if (evtData0.ackEvent()) {
+                                    String path = 
zkPaths.ackEventDataPath(evtData0.eventId());
 
-                                msg = unmarshalZip(zkClient.getData(path));
-                            }
-                            else {
-                                byte[] msgBytes = readCustomEventData(zkClient,
-                                    evtData0.evtPath,
-                                    evtData0.sndNodeId);
+                                    msg = unmarshalZip(zkClient.getData(path));
+                                }
+                                else {
+                                    assert evtData0.evtPath != null : evtData0;
 
-                                msg = unmarshalZip(msgBytes);
+                                    byte[] msgBytes = 
readCustomEventData(zkClient,
+                                        evtData0.evtPath,
+                                        evtData0.sndNodeId);
+
+                                    msg = unmarshalZip(msgBytes);
+                                }
                             }
+                            else
+                                msg = evtData0.msg;
 
-                            evtData0.msg = msg;
+                            evtData0.resolvedMsg = msg;
                         }
 
                         if (msg instanceof ZkInternalMessage)
@@ -1889,6 +1895,11 @@ public class ZookeeperDiscoveryImpl {
 
             zkClient.setData(rtState.locNodeZkPath, 
marshalZip(rtState.locNodeInfo), -1);
         }
+
+        ZkCommunicationErrorProcessFuture commErrFut = commErrProcFut.get();
+
+        if (commErrFut != null)
+            commErrFut.onTopologyChange(rtState.top); // This can add new 
event, notify out of event process loop.
     }
 
     /**
@@ -1973,12 +1984,13 @@ public class ZookeeperDiscoveryImpl {
         if (msg instanceof ZkForceNodeFailMessage)
             processForceNodeFailMessage((ZkForceNodeFailMessage)msg, evtData);
         else if (msg instanceof ZkCommunicationErrorResolveStartMessage) {
-            processStartResolveCommunicationErrorMessage(
+            processCommunicationErrorResolveStartMessage(
                 (ZkCommunicationErrorResolveStartMessage)msg,
                 evtData);
         }
         else if (msg instanceof ZkCommunicationErrorResolveFinishMessage) {
-            ZkCommunicationErrorResolveFinishMessage msg0 = 
(ZkCommunicationErrorResolveFinishMessage)msg;
+            processCommunicationErrorResolveFinishMessage(
+                (ZkCommunicationErrorResolveFinishMessage)msg);
         }
     }
 
@@ -2012,9 +2024,48 @@ public class ZookeeperDiscoveryImpl {
 
     /**
      * @param msg Message.
+     * @throws Exception If failed.
+     */
+    private void 
processCommunicationErrorResolveFinishMessage(ZkCommunicationErrorResolveFinishMessage
 msg)
+        throws Exception
+    {
+        UUID futId = msg.futId;
+
+        assert futId != null;
+        assert 
futId.equals(rtState.evtsData.communicationErrorResolveFutureId());
+
+        if (log.isInfoEnabled())
+            log.info("Received communication error resolve finish message 
[reqId=" + futId + ']');
+
+        rtState.evtsData.communicationErrorResolveFutureId(null);
+
+        ZkCommunicationErrorResolveResult res = msg.res;
+
+        if (res == null)
+            res = 
unmarshalZip(ZkDistributedCollectDataFuture.readResult(rtState.zkClient, 
zkPaths, futId));
+
+        ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
+
+        assert fut != null;
+
+        Set<Long> failedNodes = null;
+
+        if (res.failedNodes != null) {
+            failedNodes = U.newHashSet(res.failedNodes.size());
+
+            for (int i = 0; i < res.failedNodes.size(); i++)
+                failedNodes.add(res.failedNodes.get(i));
+        }
+
+        fut.onFinishResolve(failedNodes);
+    }
+
+    /**
+     * @param msg Message.
      * @param evtData Event data.
+     * @throws Exception If failed.
      */
-    private void 
processStartResolveCommunicationErrorMessage(ZkCommunicationErrorResolveStartMessage
 msg,
+    private void 
processCommunicationErrorResolveStartMessage(ZkCommunicationErrorResolveStartMessage
 msg,
         ZkDiscoveryCustomEventData evtData) throws Exception {
         ZkCommunicationErrorProcessFuture fut;
 
@@ -2043,6 +2094,11 @@ public class ZookeeperDiscoveryImpl {
             }
         }
 
+        if (log.isInfoEnabled()) {
+            log.info("Received communication error resolve request [reqId=" + 
msg.id +
+                ", topVer=" + rtState.top.topologySnapshot() + ']');
+        }
+
         assert !fut.isDone() : fut;
 
         final String futPath = zkPaths.distributedFutureBasePath(msg.id);
@@ -2050,7 +2106,7 @@ public class ZookeeperDiscoveryImpl {
         final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
 
         if (rtState.crd) {
-            ZkCollectDistributedFuture nodeResFut = new 
ZkCollectDistributedFuture(this, rtState, futPath,
+            ZkDistributedCollectDataFuture nodeResFut = new 
ZkDistributedCollectDataFuture(this, rtState, futPath,
                 new Callable<Void>() {
                     @Override public Void call() throws Exception {
                         // Future is completed from ZK event thread.
@@ -2075,25 +2131,34 @@ public class ZookeeperDiscoveryImpl {
      * @param rtState Runtime state.
      * @throws Exception If failed.
      */
-    void finishCommunicationResolveProcess(ZkRuntimeState rtState) throws 
Exception {
+    private void finishCommunicationResolveProcess(ZkRuntimeState rtState) 
throws Exception {
         ZkDiscoveryEventsData evtsData = rtState.evtsData;
 
         UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
 
         assert futId != null;
 
-        rtState.evtsData.communicationErrorResolveFutureId(null);
-
         ZkCommunicationErrorResolveFinishMessage msg = new 
ZkCommunicationErrorResolveFinishMessage(futId);
 
+        ZkCommunicationErrorResolveResult res = new 
ZkCommunicationErrorResolveResult(null);
+
+        msg.res = res;
+
+        
ZkDistributedCollectDataFuture.saveResult(zkPaths.distributedFutureResultPath(futId),
+            rtState.zkClient,
+            marshalZip(res));
+
+        evtsData.evtIdGen++;
+
         ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
             evtsData.evtIdGen,
             evtsData.topVer,
             locNode.id(),
+            msg,
             null,
             false);
 
-        evtData.msg = msg;
+        evtData.resolvedMsg = msg;
 
         evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
 
@@ -2104,11 +2169,15 @@ public class ZookeeperDiscoveryImpl {
      *
      */
     public void simulateNodeFailure() {
-        zkClient().deleteIfExistsAsync(zkPaths.aliveNodesDir);
+        ZkRuntimeState rtState = this.rtState;
+
+        ZookeeperClient client = rtState.zkClient;
+
+        client.deleteIfExistsAsync(zkPaths.aliveNodesDir);
 
         rtState.onCloseStart();
 
-        zkClient().close();
+        rtState.zkClient.close();
     }
 
     /**
@@ -2211,16 +2280,11 @@ public class ZookeeperDiscoveryImpl {
             throw new ZookeeperClientFailedException("Received node failed 
event for local node.");
         }
         else {
-            PingFuture pingFut = pingFuts.get(failedNode.id());
+            PingFuture pingFut = pingFuts.get(failedNode.order());
 
             if (pingFut != null)
                 pingFut.onDone(false);
 
-            ZkCommunicationErrorProcessFuture commErrFut = 
commErrProcFut.get();
-
-            if (commErrFut != null)
-                commErrFut.onNodeFailed(failedNode);
-
             final List<ClusterNode> topSnapshot = 
rtState.top.topologySnapshot();
 
             lsnr.onDiscovery(EVT_NODE_FAILED,
@@ -2291,9 +2355,10 @@ public class ZookeeperDiscoveryImpl {
                                 evtData.topologyVersion(), // Use topology 
version from original event.
                                 locNode.id(),
                                 null,
+                                null,
                                 true);
 
-                            ackEvtData.msg = ack;
+                            ackEvtData.resolvedMsg = ack;
 
                             if (newEvts == null)
                                 newEvts = new ArrayList<>();
@@ -2404,18 +2469,28 @@ public class ZookeeperDiscoveryImpl {
      * @param ctx Context for log.
      * @param evtData Event data.
      * @return Ack message.
+     * @throws Exception If failed.
      */
-    @Nullable private DiscoverySpiCustomMessage 
handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData) {
+    @Nullable private DiscoverySpiCustomMessage 
handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData)
+        throws Exception {
         if (log.isDebugEnabled())
             log.debug("All nodes processed custom event [ctx=" + ctx + ", 
evtData=" + evtData + ']');
 
         if (!evtData.ackEvent()) {
-            deleteCustomEventDataAsync(rtState.zkClient, evtData.evtPath);
+            if (evtData.evtPath != null)
+                deleteCustomEventDataAsync(rtState.zkClient, evtData.evtPath);
+            else {
+                if (evtData.resolvedMsg instanceof 
ZkCommunicationErrorResolveFinishMessage) {
+                    UUID futId = 
((ZkCommunicationErrorResolveFinishMessage)evtData.resolvedMsg).futId;
 
-            assert evtData.msg != null || locNode.order() > 
evtData.topologyVersion() : evtData;
+                    
ZkDistributedCollectDataFuture.deleteFutureData(rtState.zkClient, zkPaths, 
futId);
+                }
+            }
 
-            if (evtData.msg != null)
-                return evtData.msg.ackMessage();
+            assert evtData.resolvedMsg != null || locNode.order() > 
evtData.topologyVersion() : evtData;
+
+            if (evtData.resolvedMsg != null)
+                return evtData.resolvedMsg.ackMessage();
         }
         else {
             String path = zkPaths.ackEventDataPath(evtData.eventId());
@@ -2829,7 +2904,7 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    private class PingFuture extends GridFutureAdapter<Boolean> implements 
IgniteSpiTimeoutObject, Runnable {
+    private class PingFuture extends GridFutureAdapter<Boolean> implements 
IgniteSpiTimeoutObject {
         /** */
         private final ZookeeperClusterNode node;
 
@@ -2839,16 +2914,20 @@ public class ZookeeperDiscoveryImpl {
         /** */
         private final IgniteUuid id;
 
+        /** */
+        private final ZkRuntimeState rtState;
+
         /**
          * @param node Node.
          */
-        PingFuture(ZookeeperClusterNode node) {
+        PingFuture(ZkRuntimeState rtState, ZookeeperClusterNode node) {
+            this.rtState = rtState;
             this.node = node;
 
             id = IgniteUuid.fromUuid(node.id());
 
             endTime = System.currentTimeMillis() + node.sessionTimeout() + 
1000;
-        }
+        };
 
         /** {@inheritDoc} */
         @Override public IgniteUuid id() {
@@ -2861,33 +2940,34 @@ public class ZookeeperDiscoveryImpl {
         }
 
         /** {@inheritDoc} */
-        @Override public void run() {
+        @Override public void onTimeout() {
             if (checkNodeAndState()) {
-                try {
-                    for (String path : 
zkClient().getChildren(zkPaths.aliveNodesDir)) {
-                        if (node.internalId() == 
ZkIgnitePaths.aliveInternalId(path)) {
-                            onDone(true);
+                runInWorkerThread(new ZkRunnable(rtState, 
ZookeeperDiscoveryImpl.this) {
+                    @Override protected void run0() throws Exception {
+                        if (checkNodeAndState()) {
+                            try {
+                                for (String path : 
rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) {
+                                    if (node.internalId() == 
ZkIgnitePaths.aliveInternalId(path)) {
+                                        onDone(true);
+
+                                        return;
+                                    }
+                                }
+
+                                onDone(false);
+                            }
+                            catch (Exception e) {
+                                onDone(e);
 
-                            return;
+                                throw e;
+                            }
                         }
                     }
-
-                    onDone(false);
-                }
-                catch (Exception e) {
-                    if (checkNodeAndState())
-                        onDone(e);
-                }
+                });
             }
         }
 
         /** {@inheritDoc} */
-        @Override public void onTimeout() {
-            if (checkNodeAndState())
-                runInWorkerThread(this);
-        }
-
-        /** {@inheritDoc} */
         @Override public boolean onDone(@Nullable Boolean res, @Nullable 
Throwable err) {
             if (super.onDone(res, err)) {
                 pingFuts.remove(node.id(), this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e8f85ff/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 3f6a8dc..ec70be6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingZooKeeperServer;
@@ -70,6 +71,8 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.logger.java.JavaLogger;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.CommunicationProblemContext;
+import org.apache.ignite.spi.discovery.CommunicationProblemResolver;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -135,6 +138,9 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /** */
     private boolean persistence;
 
+    /** */
+    private CommunicationProblemResolver communicationProblemResolver;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         if (testSockNio)
@@ -170,8 +176,6 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         cfg.setCacheConfiguration(ccfg);
 
-        // cfg.setMarshaller(new JdkMarshaller());
-
         cfg.setClientMode(client);
 
         if (userAttrs != null)
@@ -248,6 +252,10 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         }
     }
 
+    /**
+     * @param instances Number of instances.
+     * @return Cluster.
+     */
     private static TestingCluster createTestingCluster(int instances) {
         String tmpDir = System.getProperty("java.io.tmpdir");
 
@@ -270,6 +278,9 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         return new TestingCluster(specs);
     }
 
+    /**
+     * @param file Directory to delete.
+     */
     private static void deleteRecursively0(File file) {
         File[] files = file.listFiles();
 
@@ -279,8 +290,10 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         for (File f : files) {
             if (f.isDirectory())
                 deleteRecursively0(f);
-            else
-                f.delete();
+            else {
+                if (!f.delete())
+                    throw new IgniteException("Failed to delete file: " + 
f.getAbsolutePath());
+            }
         }
     }
 
@@ -331,6 +344,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
             assertFalse("Unexpected error, see log for details", err);
 
             checkEventsConsistency();
+
+            checkInternalStructuresCleanup();
         }
         finally {
             reset();
@@ -342,6 +357,23 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    private void checkInternalStructuresCleanup() throws Exception {
+        for (Ignite node : G.allGrids()) {
+            final AtomicReference<?> res = 
GridTestUtils.getFieldValue(spi(node), "impl", "commErrProcFut");
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return res.get() == null;
+                }
+            }, 5000);
+
+            assertNull(res.get());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testMetadataUpdate() throws Exception {
         startGrid(0);
 
@@ -879,16 +911,31 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         waitForTopology(initNodes + startNodes - failCnt);
     }
 
+    /**
+     * @param node
+     * @return
+     */
     private static String aliveZkNodePath(Ignite node) {
         return aliveZkNodePath(node.configuration().getDiscoverySpi());
     }
 
+    /**
+     * @param spi
+     * @return
+     */
     private static String aliveZkNodePath(DiscoverySpi spi) {
         String path = GridTestUtils.getFieldValue(spi, "impl", "rtState", 
"locNodeZkPath");
 
         return path.substring(path.lastIndexOf('/') + 1);
     }
 
+    /**
+     * @param log
+     * @param connectString
+     * @param failedZkNodes
+     * @param timeout
+     * @throws Exception
+     */
     private static void waitNoAliveZkNodes(final IgniteLogger log,
         String connectString, final List<String> failedZkNodes,
         long timeout)
@@ -1724,6 +1771,99 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testNoOpCommunicationErrorResolve_1() throws Exception {
+        communicationErrorResolve_Simple(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoOpCommunicationErrorResolve_2() throws Exception {
+        communicationErrorResolve_Simple(10);
+    }
+
+    /**
+     * @param nodes Nodes number.
+     * @throws Exception If failed.
+     */
+    private void communicationErrorResolve_Simple(int nodes) throws Exception {
+        assert nodes > 1;
+
+        sesTimeout = 2000;
+        communicationProblemResolver = new NoOpCommunicationProblemResolver();
+
+        startGridsMultiThreaded(nodes);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 3; i++) {
+            info("Iteration: " + i);
+
+            int idx1 = rnd.nextInt(nodes);
+
+            int idx2;
+
+            do {
+                idx2 = rnd.nextInt(nodes);
+            }
+            while (idx1 == idx2);
+
+            ZookeeperDiscoverySpi spi = spi(ignite(idx1));
+
+            
spi.onCommunicationConnectionError(ignite(idx2).cluster().localNode(), new 
Exception("test"));
+
+            checkInternalStructuresCleanup();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoOpCommunicationErrorResolve_3() throws Exception {
+        // One node fails before sending communication status.
+        sesTimeout = 2000;
+        communicationProblemResolver = new NoOpCommunicationProblemResolver();
+
+        startGridsMultiThreaded(3);
+
+        sesTimeout = 10_000;
+
+        testSockNio = true;
+        sesTimeout = 5000;
+
+        startGrid(3);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
+            @Override public Object call() {
+                ZookeeperDiscoverySpi spi = spi(ignite(0));
+
+                
spi.onCommunicationConnectionError(ignite(1).cluster().localNode(), new 
Exception("test"));
+
+                return null;
+            }
+        });
+
+        U.sleep(1000);
+
+        ZkTestClientCnxnSocketNIO nio = 
ZkTestClientCnxnSocketNIO.forNode(ignite(3));
+
+        nio.closeSocket(true);
+
+        try {
+            stopGrid(3);
+
+            fut.get();
+        }
+        finally {
+            nio.allowConnect();
+        }
+
+        waitForTopology(3);
+    }
+
+    /**
      * @param dfltConsistenId Default consistent ID flag.
      * @throws Exception If failed.
      */
@@ -2062,6 +2202,14 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @param node Node.
+     * @return Node's discovery SPI.
+     */
+    private static ZookeeperDiscoverySpi spi(Ignite node) {
+        return (ZookeeperDiscoverySpi)node.configuration().getDiscoverySpi();
+    }
+
+    /**
      * @param nodeName Node name.
      * @return Node's discovery SPI.
      * @throws Exception If failed.
@@ -2069,7 +2217,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     private ZookeeperDiscoverySpi waitSpi(final String nodeName) throws 
Exception {
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
-                return spis.contains(nodeName);
+                return spis.containsKey(nodeName);
             }
         }, 5000);
 
@@ -2080,6 +2228,10 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         return spi;
     }
 
+    /**
+     * @param topVer Topology version.
+     * @return Expected event instance.
+     */
     private static DiscoveryEvent joinEvent(long topVer) {
         DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_JOINED, null);
 
@@ -2088,6 +2240,10 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         return expEvt;
     }
 
+    /**
+     * @param topVer Topology version.
+     * @return Expected event instance.
+     */
     private static DiscoveryEvent failEvent(long topVer) {
         DiscoveryEvent expEvt = new DiscoveryEvent(null, null, 
EventType.EVT_NODE_FAILED, null);
 
@@ -2370,4 +2526,14 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     private static class C2 implements Serializable {
         // No-op.
     }
+
+    /**
+     *
+     */
+    static class NoOpCommunicationProblemResolver implements 
CommunicationProblemResolver {
+        /** {@inheritDoc} */
+        @Override public void resolve(CommunicationProblemContext ctx) {
+            // No-op.
+        }
+    }
 }

Reply via email to