Repository: ignite
Updated Branches:
  refs/heads/ignite-zk-ce b7cbd4cef -> 376a48456


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/376a4845
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/376a4845
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/376a4845

Branch: refs/heads/ignite-zk-ce
Commit: 376a484569db8d1a0eb4b43128d7e327defb5e3d
Parents: b7cbd4c
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Dec 13 16:00:34 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Dec 13 17:52:03 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZkAbstractCallabck.java         |  66 +++
 .../zk/internal/ZkAbstractChildrenCallback.java |  53 +++
 .../zk/internal/ZkAbstractWatcher.java          |  55 +++
 .../zk/internal/ZkCollectDistributedFuture.java | 171 ++++++++
 .../ZkCommunicationErrorProcessFuture.java      | 225 ++++++++---
 ...kCommunicationErrorResolveFinishMessage.java |  38 ++
 ...ZkCommunicationErrorResolveStartMessage.java |  50 +++
 .../zk/internal/ZkDiscoveryEventsData.java      |  18 +
 .../discovery/zk/internal/ZkEventAckFuture.java | 142 -------
 .../zk/internal/ZkForceNodeFailMessage.java     |  55 +++
 .../discovery/zk/internal/ZkIgnitePaths.java    |   8 +
 .../ZkInternalCommunicationErrorMessage.java    |  39 --
 .../ZkInternalForceNodeFailMessage.java         |  55 ---
 .../spi/discovery/zk/internal/ZkRunnable.java   |  51 +++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 397 +++++++++++--------
 15 files changed, 981 insertions(+), 442 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
new file mode 100644
index 0000000..d2efb9f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+
+/**
+ *
+ */
+abstract class ZkAbstractCallabck {
+    /** */
+    final ZkRuntimeState rtState;
+
+    /** */
+    private final ZookeeperDiscoveryImpl impl;
+
+    /** */
+    private final GridSpinBusyLock busyLock;
+
+    /**
+     * @param rtState Runtime state.
+     * @param impl Discovery impl.
+     */
+    ZkAbstractCallabck(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+        this.rtState = rtState;
+        this.impl = impl;
+
+        busyLock = impl.busyLock;
+    }
+
+    /**
+     * @return {@code True} if is able to start processing.
+     */
+    final boolean onProcessStart() {
+        return !rtState.closing && busyLock.enterBusy();
+    }
+
+    /**
+     *
+     */
+    final void onProcessEnd() {
+        busyLock.leaveBusy();
+    }
+
+    /**
+     * @param e Error.
+     */
+    final void onProcessError(Throwable e) {
+        impl.onFatalError(busyLock, e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
new file mode 100644
index 0000000..5679993
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.List;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ *
+ */
+abstract class ZkAbstractChildrenCallback extends ZkAbstractCallabck 
implements AsyncCallback.Children2Callback {
+    /**
+     * @param rtState Runtime state.
+     * @param impl Discovery impl.
+     */
+    ZkAbstractChildrenCallback(ZkRuntimeState rtState, ZookeeperDiscoveryImpl 
impl) {
+        super(rtState, impl);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+        if (!onProcessStart())
+            return;
+
+        try {
+            processResult0(rc, path, ctx, children, stat);
+
+            onProcessEnd();
+        }
+        catch (Throwable e) {
+            onProcessError(e);
+        }
+    }
+
+    abstract void processResult0(int rc, String path, Object ctx, List<String> 
children, Stat stat)
+        throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
new file mode 100644
index 0000000..9098d05
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+/**
+ *
+ */
+abstract class ZkAbstractWatcher extends ZkAbstractCallabck implements Watcher 
{
+    /**
+     * @param rtState Runtime state.
+     * @param impl Discovery impl.
+     */
+    ZkAbstractWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+        super(rtState, impl);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void process(WatchedEvent evt) {
+        if (!onProcessStart())
+            return;
+
+        try {
+            process0(evt);
+
+            onProcessEnd();
+        }
+        catch (Throwable e) {
+            onProcessError(e);
+        }
+    }
+
+    /**
+     * @param evt Event.
+     * @throws Exception If failed.
+     */
+    protected abstract void process0(WatchedEvent evt) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java
new file mode 100644
index 0000000..fa529cf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCollectDistributedFuture.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkCollectDistributedFuture extends GridFutureAdapter<Void> {
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final String futPath;
+
+    /** */
+    private final ZookeeperDiscoveryImpl impl;
+
+    /** */
+    private final Set<Long> remainingNodes;
+
+    /** */
+    private final Callable<Void> lsnr;
+
+    /**
+     * @param impl
+     * @param rtState
+     * @param futPath
+     */
+    ZkCollectDistributedFuture(ZookeeperDiscoveryImpl impl, ZkRuntimeState 
rtState, String futPath, Callable<Void> lsnr) throws Exception {
+        this.impl = impl;
+        this.log = impl.log();
+        this.futPath = futPath;
+        this.lsnr = lsnr;
+
+        ZkClusterNodes top = impl.nodes();
+
+        remainingNodes = U.newHashSet(top.nodesByOrder.size());
+
+        for (ZookeeperClusterNode node : top.nodesByInternalId.values())
+            remainingNodes.add(node.order());
+
+        NodeResultsWatcher watcher = new NodeResultsWatcher(rtState, impl);
+
+        if (remainingNodes.isEmpty())
+            completeAndNotifyListener();
+        else
+            rtState.zkClient.getChildrenAsync(futPath, watcher, watcher);
+    }
+
+    /**
+     * @throws Exception If listener call failed.
+     */
+    private void completeAndNotifyListener() throws Exception {
+        if (super.onDone())
+            lsnr.call();
+    }
+
+    /**
+     * @param futPath
+     * @param client
+     * @param nodeOrder
+     * @param data
+     * @throws Exception If failed.
+     */
+    static void saveNodeResult(String futPath, ZookeeperClient client, long 
nodeOrder, byte[] data) throws Exception {
+        client.createIfNeeded(futPath + "/" + nodeOrder, data, 
CreateMode.PERSISTENT);
+    }
+
+    /**
+     * @param node Failed node.
+     */
+    void onNodeFail(ZookeeperClusterNode node) throws Exception {
+        long nodeOrder = node.order();
+
+        if (remainingNodes.remove(nodeOrder)) {
+            int remaining = remainingNodes.size();
+
+            if (log.isInfoEnabled()) {
+                log.info("ZkCollectDistributedFuture removed remaining failed 
node [node=" + nodeOrder +
+                    ", remaining=" + remaining +
+                    ", futPath=" + futPath + ']');
+            }
+
+            if (remaining == 0)
+                completeAndNotifyListener();
+        }
+    }
+
+    /**
+     *
+     */
+    class NodeResultsWatcher extends ZkAbstractWatcher implements 
AsyncCallback.Children2Callback {
+        /**
+         * @param rtState Runtime state.
+         * @param impl Discovery impl.
+         */
+        NodeResultsWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl 
impl) {
+            super(rtState, impl);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void process0(WatchedEvent evt) {
+            if (evt.getType() == Watcher.Event.EventType.NodeChildrenChanged)
+                impl.zkClient().getChildrenAsync(evt.getPath(), this, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+            if (!onProcessStart())
+                return;
+
+            try {
+                assert rc == 0 : KeeperException.Code.get(rc);
+
+                if (isDone())
+                    return;
+
+                for (int i = 0; i < children.size(); i++) {
+                    Long nodeOrder = Long.parseLong(children.get(i));
+
+                    if (remainingNodes.remove(nodeOrder)) {
+                        int remaining = remainingNodes.size();
+
+                        if (log.isInfoEnabled()) {
+                            log.info("ZkCollectDistributedFuture added new 
result [node=" + nodeOrder +
+                                ", remaining=" + remaining +
+                                ", futPath=" + path + ']');
+                        }
+
+                        if (remaining == 0)
+                            completeAndNotifyListener();
+                    }
+                }
+
+                onProcessEnd();
+            }
+            catch (Throwable e) {
+                onProcessError(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
index 2ea65e8..d87f500 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -17,24 +17,31 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
-class ZkCommunicationErrorProcessFuture extends GridFutureAdapter implements 
IgniteSpiTimeoutObject, Runnable {
+class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> 
implements IgniteSpiTimeoutObject, Runnable {
     /** */
     private final ZookeeperDiscoveryImpl impl;
 
     /** */
-    private final Map<UUID, GridFutureAdapter<Boolean>> errNodes = new 
HashMap<>();
+    private final Map<Long, GridFutureAdapter<Boolean>> nodeFuts = new 
HashMap<>();
 
     /** */
     private final long endTime;
@@ -42,48 +49,142 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter implements Ign
     /** */
     private final IgniteUuid id;
 
+    /** */
+    private State state;
+
+    /** */
+    private long resolveTopVer;
+
+    /** */
+    private Set<Long> resFailedNodes;
+
+    /** */
+    private ZkCollectDistributedFuture nodeResFut;
+
+    /**
+     * @param impl Discovery impl.
+     * @param timeout Timeout to wait before initiating resolve process.
+     * @return Future.
+     */
+    static ZkCommunicationErrorProcessFuture 
createOnCommunicationError(ZookeeperDiscoveryImpl impl, long timeout) {
+        return new ZkCommunicationErrorProcessFuture(impl, State.WAIT_TIMEOUT, 
timeout);
+    }
+
+    /**
+     * @param impl Discovery impl.
+     * @return Future.
+     */
+    static ZkCommunicationErrorProcessFuture 
createOnStartResolveRequest(ZookeeperDiscoveryImpl impl) {
+        return new ZkCommunicationErrorProcessFuture(impl, 
State.RESOLVE_STARTED, 0);
+    }
+
     /**
      * @param impl Discovery implementation.
+     * @param state Initial state.
      * @param timeout Wait timeout before initiating communication errors 
resolve.
      */
-    ZkCommunicationErrorProcessFuture(ZookeeperDiscoveryImpl impl, long 
timeout) {
+    private ZkCommunicationErrorProcessFuture(ZookeeperDiscoveryImpl impl, 
State state, long timeout) {
+        assert state != State.DONE;
+
         this.impl = impl;
 
-        id = IgniteUuid.fromUuid(impl.localNode().id());
+        if (state == State.WAIT_TIMEOUT) {
+            assert timeout > 0 : timeout;
+
+            id = IgniteUuid.fromUuid(impl.localNode().id());
+            endTime = System.currentTimeMillis() + timeout;
+        }
+        else {
+            id = null;
+            endTime = 0;
+        }
 
-        endTime = System.currentTimeMillis() + timeout;
+        this.state = state;
+    }
+
+    void nodeResultCollectFuture(ZkCollectDistributedFuture nodeResFut) {
+        assert nodeResFut == null : nodeResFut;
+
+        this.nodeResFut = nodeResFut;
+    }
+
+    void pingNodesAndNotifyFuture(long locNodeOrder, ZkRuntimeState rtState, 
String futPath, Collection<ClusterNode> nodes)
+        throws Exception {
+        ZkCollectDistributedFuture.saveNodeResult(futPath, rtState.zkClient, 
locNodeOrder, null);
     }
 
     /**
-     * @param nodeId Node ID.
-     * @return Future finished when communication error resolve is done.
+     *
      */
-    GridFutureAdapter<Boolean> nodeStatusFuture(UUID nodeId) {
-        GridFutureAdapter<Boolean> fut;
+    void scheduleCheckOnTimeout() {
+        synchronized (this) {
+            if (state == State.WAIT_TIMEOUT)
+                impl.spi.getSpiContext().addTimeoutObject(this);
+        }
+    }
 
-        // TODO ZK: finish race.
+    /**
+     * @param topVer Topology version.
+     * @return {@code False} if future was already completed and need create 
another future instance.
+     */
+    boolean onStartResolveRequest(long topVer) {
+        synchronized (this) {
+            if (state == State.DONE)
+                return false;
+
+            if (state == State.WAIT_TIMEOUT)
+                impl.spi.getSpiContext().removeTimeoutObject(this);
+
+            assert resolveTopVer == 0 : resolveTopVer;
+
+            resolveTopVer = topVer;
+
+            state = State.RESOLVE_STARTED;
+        }
+
+        return true;
+    }
+
+    /**
+     * @param node Node.
+     * @return Future finished when communication error resolve is done or 
{@code null} if another
+     *      resolve process should be started.
+     */
+    @Nullable IgniteInternalFuture<Boolean> nodeStatusFuture(ClusterNode node) 
{
+        GridFutureAdapter<Boolean> fut;
 
         synchronized (this) {
-            fut = errNodes.get(nodeId);
+            if (state == State.DONE) {
+                if (resolveTopVer != 0 && node.order() <= resolveTopVer) {
+                    Boolean res = !F.contains(resFailedNodes, node.order());
+
+                    return new GridFinishedFuture<>(res);
+                }
+                else
+                    return null;
+            }
+
+            fut = nodeFuts.get(node.order());
 
             if (fut == null)
-                errNodes.put(nodeId, fut = new GridFutureAdapter<>());
+                nodeFuts.put(node.order(), fut = new GridFutureAdapter<>());
         }
 
-        if (impl.node(nodeId) == null)
+        if (impl.node(node.order()) == null)
             fut.onDone(false);
 
         return fut;
     }
 
     /**
-     * @param nodeId Node ID.
+     * @param node Failed node.
      */
-    void onNodeFailed(UUID nodeId) {
-        GridFutureAdapter<Boolean> fut;
+    void onNodeFailed(ClusterNode node) {
+        GridFutureAdapter<Boolean> fut = null;
 
         synchronized (this) {
-            fut = errNodes.get(nodeId);
+            if (state == State.WAIT_TIMEOUT)
+                fut = nodeFuts.get(node.order());
         }
 
         if (fut != null)
@@ -92,14 +193,50 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter implements Ign
 
     /** {@inheritDoc} */
     @Override public void run() {
-        if (checkNotDoneOnTimeout()) {
+        // Run from zk discovery worker pool after timeout.
+        if (processTimeout()) {
             try {
-                impl.sendCustomMessage(new 
ZkInternalCommunicationErrorMessage());
+                impl.sendCustomMessage(new 
ZkCommunicationErrorResolveStartMessage(UUID.randomUUID()));
             }
             catch (Exception e) {
-                onError(e);
+                Collection<GridFutureAdapter<Boolean>> futs;
+
+                synchronized (this) {
+                    if (state != State.WAIT_TIMEOUT)
+                        return;
+
+                    state = State.DONE;
+
+                    futs = nodeFuts.values(); // nodeFuts should not be 
modified after state changed to DONE.
+                }
+
+                for (GridFutureAdapter<Boolean> fut : futs)
+                    fut.onDone(e);
+
+                onDone(e);
+            }
+        }
+    }
+
+    /**
+     * @return {@code True} if need initiate resolve process after timeout 
expired.
+     */
+    private boolean processTimeout() {
+        synchronized (this) {
+            if (state != State.WAIT_TIMEOUT)
+                return false;
+
+            for (GridFutureAdapter<Boolean> fut : nodeFuts.values()) {
+                if (!fut.isDone())
+                    return true;
             }
+
+            state = State.DONE;
         }
+
+        onDone(null, null);
+
+        return false;
     }
 
     /** {@inheritDoc} */
@@ -114,43 +251,37 @@ class ZkCommunicationErrorProcessFuture extends 
GridFutureAdapter implements Ign
 
     /** {@inheritDoc} */
     @Override public void onTimeout() {
-        if (isDone())
-            return;
-
-        if (checkNotDoneOnTimeout())
+        if (processTimeout())
             impl.runInWorkerThread(this);
     }
 
-    /**
-     * @param e Error.
-     */
-    private void onError(Exception e) {
-        List<GridFutureAdapter<Boolean>> futs;
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable 
err) {
+        if (super.onDone(res, err)) {
+            impl.clearCommunicationErrorProcessFuture(this);
 
-        synchronized (this) {
-            futs = new ArrayList<>(errNodes.values());
+            return true;
         }
 
-        for (GridFutureAdapter<Boolean> fut : futs)
-            fut.onDone(e);
+        return false;
+    }
 
-        onDone(e);
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkCommunicationErrorProcessFuture.class, this);
     }
 
     /**
-     * @return {@code True} if future already finished.
+     *
      */
-    private boolean checkNotDoneOnTimeout() {
-        // TODO ZK check state.
-        synchronized (this) {
-            for (GridFutureAdapter<Boolean> fut : errNodes.values()) {
-                if (!fut.isDone())
-                    return false;
-            }
-        }
+    enum State {
+        /** */
+        DONE,
 
-        onDone(null);
+        /** */
+        WAIT_TIMEOUT,
 
-        return true;
+        /** */
+        RESOLVE_STARTED
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
new file mode 100644
index 0000000..144a5bf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+class ZkCommunicationErrorResolveFinishMessage implements ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final UUID futId;
+
+    /**
+     * @param futId Future ID.
+     */
+    ZkCommunicationErrorResolveFinishMessage(UUID futId) {
+        this.futId = futId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
new file mode 100644
index 0000000..e619d7b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ZkCommunicationErrorResolveStartMessage implements 
DiscoverySpiCustomMessage, ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final UUID id;
+
+    /**
+     * @param id Unique ID.
+     */
+    ZkCommunicationErrorResolveStartMessage(UUID id) {
+        this.id = id;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
index 37dc7df..faea49e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.zk.internal;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.TreeMap;
+import java.util.UUID;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -44,6 +45,9 @@ class ZkDiscoveryEventsData implements Serializable {
     /** */
     TreeMap<Long, ZkDiscoveryEventData> evts;
 
+    /** */
+    private UUID commErrFutId;
+
     /**
      * @param topVer Current topology version.
      * @param gridStartTime Cluster start time.
@@ -56,6 +60,20 @@ class ZkDiscoveryEventsData implements Serializable {
     }
 
     /**
+     * @return Future ID.
+     */
+    @Nullable UUID communicationErrorResolveFutureId() {
+        return commErrFutId;
+    }
+
+    /**
+     * @param id Future ID.
+     */
+     void communicationErrorResolveFutureId(UUID id) {
+        commErrFutId = id;
+    }
+
+    /**
      * @param nodes Current nodes in topology (these nodes should ack that 
event processed).
      * @param evt Event.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
deleted file mode 100644
index ffe65c3..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.List;
-import java.util.Set;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public class ZkEventAckFuture extends GridFutureAdapter<Void> implements 
Watcher, AsyncCallback.Children2Callback {
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final ZookeeperDiscoveryImpl impl;
-
-    /** */
-    private final Long evtId;
-
-    /** */
-    private final String evtPath;
-
-    /** */
-    private final int expAcks;
-
-    /** */
-    private final Set<Integer> remaininAcks;
-
-    /**
-     * @param impl
-     * @param evtPath
-     * @param evtId
-     */
-    ZkEventAckFuture(ZookeeperDiscoveryImpl impl, String evtPath, Long evtId) {
-        this.impl = impl;
-        this.log = impl.log();
-        this.evtPath = evtPath;
-        this.evtId = evtId;
-
-        ZkClusterNodes top = impl.nodes();
-
-        remaininAcks = U.newHashSet(top.nodesById.size());
-
-        for (ZookeeperClusterNode node : top.nodesByInternalId.values()) {
-            if (!node.isLocal())
-                remaininAcks.add(node.internalId());
-        }
-
-        expAcks = remaininAcks.size();
-
-        if (expAcks == 0)
-            onDone();
-        else
-            impl.zkClient().getChildrenAsync(evtPath, this, this);
-    }
-
-    /**
-     * @return Event ID.
-     */
-    Long eventId() {
-        return evtId;
-    }
-
-    /**
-     * @param node Failed node.
-     */
-    void onNodeFail(ZookeeperClusterNode node) {
-        assert !remaininAcks.isEmpty();
-
-        if (remaininAcks.remove(node.internalId()) && remaininAcks.isEmpty())
-            onDone();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable 
err) {
-        if (super.onDone(res, err)) {
-            return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void process(WatchedEvent evt) {
-        if (isDone())
-            return;
-
-        if (evt.getType() == Event.EventType.NodeChildrenChanged) {
-            if (evtPath.equals(evt.getPath()))
-                impl.zkClient().getChildrenAsync(evtPath, this, this);
-            else
-                U.warn(log, "Received event for unknown path: " + 
evt.getPath());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
-        assert rc == 0 : KeeperException.Code.get(rc);
-
-        if (isDone())
-            return;
-
-        if (expAcks == stat.getCversion()) {
-            log.info("Received expected number of acks [expCnt=" + expAcks + 
", cVer=" + stat.getCversion() + ']');
-
-            onDone();
-        }
-        else {
-            for (int i = 0; i < children.size(); i++) {
-                Integer nodeInternalId = Integer.parseInt(children.get(i));
-
-                if (remaininAcks.remove(nodeInternalId) && remaininAcks.size() 
== 0)
-                    onDone();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
new file mode 100644
index 0000000..333f457
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, 
ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final UUID nodeId;
+
+    /** */
+    final String warning;
+
+    /**
+     * @param nodeId Node ID.
+     * @param warning Warning to be displayed on all nodes.
+     */
+    ZkForceNodeFailMessage(UUID nodeId, String warning) {
+        this.nodeId = nodeId;
+        this.warning = warning;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index f08032a..06c5d9e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -292,4 +292,12 @@ class ZkIgnitePaths {
     String ackEventDataPath(long evtId) {
         return customEvtsAcksDir + "/" + String.valueOf(evtId);
     }
+
+    /**
+     * @param id Future ID.
+     * @return Future path.
+     */
+    String distributedFutureBasePath(UUID id) {
+        return evtsPath + "/f-" + id;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java
deleted file mode 100644
index d7ed7ab..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalCommunicationErrorMessage.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public class ZkInternalCommunicationErrorMessage implements 
DiscoverySpiCustomMessage, ZkInternalMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isMutable() {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
deleted file mode 100644
index 8d7a3df..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.UUID;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public class ZkInternalForceNodeFailMessage implements 
DiscoverySpiCustomMessage, ZkInternalMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    final UUID nodeId;
-
-    /** */
-    final String warning;
-
-    /**
-     * @param nodeId Node ID.
-     * @param warning Warning to be displayed on all nodes.
-     */
-    ZkInternalForceNodeFailMessage(UUID nodeId, String warning) {
-        this.nodeId = nodeId;
-        this.warning = warning;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isMutable() {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
new file mode 100644
index 0000000..fb6cf89
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+public abstract class ZkRunnable extends ZkAbstractCallabck implements 
Runnable {
+    /**
+     * @param rtState Runtime state.
+     * @param impl Discovery impl.
+     */
+    ZkRunnable(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+        super(rtState, impl);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() {
+        if (!onProcessStart())
+            return;
+
+        try {
+            run0();
+
+            onProcessEnd();
+        }
+        catch (Throwable e) {
+            onProcessError(e);
+        }
+    }
+
+    /**
+     *
+     */
+    protected abstract void run0() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/376a4845/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index d21c18b..a153d11 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -43,6 +44,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.ClusterMetricsSnapshot;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
@@ -52,6 +54,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.MarshallerUtils;
@@ -88,7 +91,7 @@ public class ZookeeperDiscoveryImpl {
     static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = 
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD";
 
     /** */
-    private final ZookeeperDiscoverySpi spi;
+    final ZookeeperDiscoverySpi spi;
 
     /** */
     private final String igniteInstanceName;
@@ -109,7 +112,7 @@ public class ZookeeperDiscoveryImpl {
     private final IgniteLogger log;
 
     /** */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+    final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
     /** */
     private final ZookeeperClusterNode locNode;
@@ -217,9 +220,25 @@ public class ZookeeperDiscoveryImpl {
         return rtState.top.nodesById.get(nodeId);
     }
 
+    /**
+     * @param nodeOrder Node order.
+     * @return Node instance.
+     */
+    @Nullable public ZookeeperClusterNode node(long nodeOrder) {
+        assert nodeOrder > 0 : nodeOrder;
+
+        return rtState.top.nodesByOrder.get(nodeOrder);
+    }
+
     /** */
     private final AtomicReference<ZkCommunicationErrorProcessFuture> 
commErrProcFut = new AtomicReference<>();
 
+    void 
clearCommunicationErrorProcessFuture(ZkCommunicationErrorProcessFuture fut) {
+        assert fut.isDone() : fut;
+
+        commErrProcFut.compareAndSet(fut, null);
+    }
+
     /**
      * @param node0 Problem node ID
      * @param err Connect error.
@@ -230,24 +249,41 @@ public class ZookeeperDiscoveryImpl {
         if (node == null)
             return;
 
-        ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
+        IgniteInternalFuture<Boolean> nodeStatusFut;
+
+        for (;;) {
+            ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
+
+            if (fut == null || fut.isDone()) {
+                ZkCommunicationErrorProcessFuture newFut = 
ZkCommunicationErrorProcessFuture.createOnCommunicationError(
+                    this,
+                    node.sessionTimeout() + 1000);
+
+                if (commErrProcFut.compareAndSet(fut, newFut)) {
+                    fut = newFut;
 
-        if (fut == null || fut.isDone()) {
-            ZkCommunicationErrorProcessFuture newFut = new 
ZkCommunicationErrorProcessFuture(
-                this,
-                node.sessionTimeout() + 1000);
+                    fut.scheduleCheckOnTimeout();
+                }
+                else
+                    fut = commErrProcFut.get();
+            }
 
-            if (commErrProcFut.compareAndSet(fut, newFut)) {
-                fut = newFut;
+            nodeStatusFut = fut.nodeStatusFuture(node);
 
-                spi.getSpiContext().addTimeoutObject(fut);
+            if (nodeStatusFut != null)
+                break;
+            else {
+                try {
+                    fut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    U.warn(log, "Previous communication error process future 
failed: " + e);
+                }
             }
-            else
-                fut = commErrProcFut.get();
         }
 
         try {
-            fut.nodeStatusFuture(node.id()).get();
+            nodeStatusFut.get();
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException(e);
@@ -312,7 +348,7 @@ public class ZookeeperDiscoveryImpl {
             return;
         }
 
-        sendCustomMessage(new ZkInternalForceNodeFailMessage(nodeId, warning));
+        sendCustomMessage(new ZkForceNodeFailMessage(nodeId, warning));
     }
 
     /**
@@ -1503,22 +1539,23 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param zkClient Client.
      * @param evtPath Event path.
      * @param sndNodeId Sender node ID.
      * @return Event data.
      * @throws Exception If failed.
      */
-    private byte[] readCustomEventData(String evtPath, UUID sndNodeId) throws 
Exception {
+    private byte[] readCustomEventData(ZookeeperClient zkClient, String 
evtPath, UUID sndNodeId) throws Exception {
         int partCnt = ZkIgnitePaths.customEventPartsCount(evtPath);
 
         if (partCnt > 1) {
             String partsBasePath = zkPaths.customEventPartsBasePath(
                 ZkIgnitePaths.customEventPrefix(evtPath), sndNodeId);
 
-            return readMultipleParts(rtState.zkClient, partsBasePath, partCnt);
+            return readMultipleParts(zkClient, partsBasePath, partCnt);
         }
         else
-            return rtState.zkClient.getData(zkPaths.customEvtsDir + "/" + 
evtPath);
+            return zkClient.getData(zkPaths.customEvtsDir + "/" + evtPath);
     }
 
     /**
@@ -1528,6 +1565,9 @@ public class ZookeeperDiscoveryImpl {
     private void generateCustomEvents(List<String> customEvtNodes) throws 
Exception {
         assert rtState.crd;
 
+        ZookeeperClient zkClient = rtState.zkClient;
+        ZkDiscoveryEventsData evtsData = rtState.evtsData;
+
         TreeMap<Integer, String> newEvts = null;
 
         for (int i = 0; i < customEvtNodes.size(); i++) {
@@ -1535,7 +1575,7 @@ public class ZookeeperDiscoveryImpl {
 
             int evtSeq = ZkIgnitePaths.customEventSequence(evtPath);
 
-            if (evtSeq > rtState.evtsData.procCustEvt) {
+            if (evtSeq > evtsData.procCustEvt) {
                 if (newEvts == null)
                     newEvts = new TreeMap<>();
 
@@ -1547,6 +1587,8 @@ public class ZookeeperDiscoveryImpl {
             Set<UUID> alives = null;
 
             for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) {
+                evtsData.procCustEvt = evtE.getKey();
+
                 String evtPath = evtE.getValue();
 
                 UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath);
@@ -1557,23 +1599,21 @@ public class ZookeeperDiscoveryImpl {
                     sndNode = null;
 
                 if (sndNode != null) {
-                    byte[] evtBytes = readCustomEventData(evtPath, sndNodeId);
+                    byte[] evtBytes = readCustomEventData(zkClient, evtPath, 
sndNodeId);
 
                     DiscoverySpiCustomMessage msg;
 
                     try {
                         msg = unmarshalZip(evtBytes);
 
-                        rtState.evtsData.evtIdGen++;
-
-                        if (msg instanceof ZkInternalForceNodeFailMessage) {
-                            ZkInternalForceNodeFailMessage msg0 = 
(ZkInternalForceNodeFailMessage)msg;
+                        if (msg instanceof ZkForceNodeFailMessage) {
+                            ZkForceNodeFailMessage msg0 = 
(ZkForceNodeFailMessage)msg;
 
                             if (alives == null)
                                 alives = new 
HashSet<>(rtState.top.nodesById.keySet());
 
                             if (alives.contains(msg0.nodeId)) {
-                                rtState.evtsData.topVer++;
+                                evtsData.topVer++;
 
                                 alives.remove(msg0.nodeId);
 
@@ -1581,9 +1621,9 @@ public class ZookeeperDiscoveryImpl {
 
                                 assert node != null :  msg0.nodeId;
 
-                                for (String child : 
zkClient().getChildren(zkPaths.aliveNodesDir)) {
+                                for (String child : 
zkClient.getChildren(zkPaths.aliveNodesDir)) {
                                     if (ZkIgnitePaths.aliveInternalId(child) 
== node.internalId()) {
-                                        
zkClient().deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child);
+                                        
zkClient.deleteIfExistsAsync(zkPaths.aliveNodesDir + "/" + child);
 
                                         break;
                                     }
@@ -1593,20 +1633,51 @@ public class ZookeeperDiscoveryImpl {
                                 if (log.isDebugEnabled())
                                     log.debug("Ignore forcible node fail 
request for unknown node: " + msg0.nodeId);
 
+                                deleteCustomEventDataAsync(zkClient, evtPath);
+
+                                continue;
+                            }
+                        }
+                        else if (msg instanceof 
ZkCommunicationErrorResolveStartMessage) {
+                            ZkCommunicationErrorResolveStartMessage msg0 =
+                                (ZkCommunicationErrorResolveStartMessage)msg;
+
+                            if (evtsData.communicationErrorResolveFutureId() 
!= null) {
+                                if (log.isInfoEnabled()) {
+                                    log.info("Ignore communication error 
resolve message, resolve process " +
+                                        "already started [sndNode=" + sndNode 
+ ']');
+                                }
+
+                                deleteCustomEventDataAsync(zkClient, evtPath);
+
                                 continue;
                             }
+                            else {
+                                if (log.isInfoEnabled()) {
+                                    log.info("Start communication error 
resolve [sndNode=" + sndNode +
+                                        ", topVer=" + evtsData.topVer + ']');
+                                }
+
+                                
zkClient.createIfNeeded(zkPaths.distributedFutureBasePath(msg0.id),
+                                    null,
+                                    PERSISTENT);
+
+                                
evtsData.communicationErrorResolveFutureId(msg0.id);
+                            }
                         }
 
+                        evtsData.evtIdGen++;
+
                         ZkDiscoveryCustomEventData evtData = new 
ZkDiscoveryCustomEventData(
-                            rtState.evtsData.evtIdGen,
-                            rtState.evtsData.topVer,
+                            evtsData.evtIdGen,
+                            evtsData.topVer,
                             sndNodeId,
                             evtPath,
                             false);
 
                         evtData.msg = msg;
 
-                        
rtState.evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
+                        evtsData.addEvent(rtState.top.nodesByOrder.values(), 
evtData);
 
                         if (log.isDebugEnabled())
                             log.debug("Generated CUSTOM event [evt=" + evtData 
+ ", msg=" + msg + ']');
@@ -1622,8 +1693,6 @@ public class ZookeeperDiscoveryImpl {
 
                     deleteCustomEventDataAsync(rtState.zkClient, evtPath);
                 }
-
-                rtState.evtsData.procCustEvt = evtE.getKey();
             }
 
             saveAndProcessNewEvents();
@@ -1695,6 +1764,8 @@ public class ZookeeperDiscoveryImpl {
     private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws 
Exception {
         TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
 
+        ZookeeperClient zkClient = rtState.zkClient;
+
         boolean updateNodeInfo = false;
 
         for (ZkDiscoveryEventData evtData : 
evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) {
@@ -1768,10 +1839,12 @@ public class ZookeeperDiscoveryImpl {
                             if (evtData0.ackEvent()) {
                                 String path = 
zkPaths.ackEventDataPath(evtData0.eventId());
 
-                                msg = 
unmarshalZip(rtState.zkClient.getData(path));
+                                msg = unmarshalZip(zkClient.getData(path));
                             }
                             else {
-                                byte[] msgBytes = 
readCustomEventData(evtData0.evtPath, evtData0.sndNodeId);
+                                byte[] msgBytes = readCustomEventData(zkClient,
+                                    evtData0.evtPath,
+                                    evtData0.sndNodeId);
 
                                 msg = unmarshalZip(msgBytes);
                             }
@@ -1814,7 +1887,7 @@ public class ZookeeperDiscoveryImpl {
             if (log.isDebugEnabled())
                 log.debug("Update processed events: " + 
rtState.locNodeInfo.lastProcEvt);
 
-            rtState.zkClient.setData(rtState.locNodeZkPath, 
marshalZip(rtState.locNodeInfo), -1);
+            zkClient.setData(rtState.locNodeZkPath, 
marshalZip(rtState.locNodeInfo), -1);
         }
     }
 
@@ -1897,29 +1970,134 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void processInternalMessage(ZkDiscoveryCustomEventData evtData, 
ZkInternalMessage msg) throws Exception {
-        if (msg instanceof ZkInternalForceNodeFailMessage) {
-            ZkInternalForceNodeFailMessage msg0 = 
(ZkInternalForceNodeFailMessage)msg;
+        if (msg instanceof ZkForceNodeFailMessage)
+            processForceNodeFailMessage((ZkForceNodeFailMessage)msg, evtData);
+        else if (msg instanceof ZkCommunicationErrorResolveStartMessage) {
+            processStartResolveCommunicationErrorMessage(
+                (ZkCommunicationErrorResolveStartMessage)msg,
+                evtData);
+        }
+        else if (msg instanceof ZkCommunicationErrorResolveFinishMessage) {
+            ZkCommunicationErrorResolveFinishMessage msg0 = 
(ZkCommunicationErrorResolveFinishMessage)msg;
+        }
+    }
+
+    /**
+     * @param msg Message.
+     * @param evtData Event data.
+     * @throws Exception If failed.
+     */
+    private void processForceNodeFailMessage(ZkForceNodeFailMessage msg, 
ZkDiscoveryCustomEventData evtData)
+        throws Exception {
+        ClusterNode creatorNode = rtState.top.nodesById.get(evtData.sndNodeId);
+
+        if (msg.warning != null) {
+            U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
+                "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : 
evtData.sndNodeId) +
+                ", nodeId=" + msg.nodeId +
+                ", msg=" + msg.warning + ']');
+        }
+        else {
+            U.warn(log, "Received force EVT_NODE_FAILED event [" +
+                "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : 
evtData.sndNodeId) +
+                ", nodeId=" + msg.nodeId + ']');
+        }
+
+        ZookeeperClusterNode node = rtState.top.nodesById.get(msg.nodeId);
+
+        assert node != null : msg.nodeId;
+
+        processNodeFail(node.internalId(), evtData.topologyVersion());
+    }
+
+    /**
+     * @param msg Message.
+     * @param evtData Event data.
+     */
+    private void 
processStartResolveCommunicationErrorMessage(ZkCommunicationErrorResolveStartMessage
 msg,
+        ZkDiscoveryCustomEventData evtData) throws Exception {
+        ZkCommunicationErrorProcessFuture fut;
 
-            ClusterNode creatorNode = 
rtState.top.nodesById.get(evtData.sndNodeId);
+        for (;;) {
+            fut = commErrProcFut.get();
+
+            if (fut == null || fut.isDone()) {
+                ZkCommunicationErrorProcessFuture newFut =
+                    
ZkCommunicationErrorProcessFuture.createOnStartResolveRequest(this);
 
-            if (msg0.warning != null) {
-                U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
-                    "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : 
evtData.sndNodeId) +
-                    ", nodeId=" + msg0.nodeId +
-                    ", msg=" + msg0.warning + ']');
+                if (commErrProcFut.compareAndSet(fut, newFut))
+                    fut = newFut;
+                else
+                    fut = commErrProcFut.get();
             }
+
+            if (fut.onStartResolveRequest(evtData.topologyVersion()))
+                break;
             else {
-                U.warn(log, "Received force EVT_NODE_FAILED event [" +
-                    "nodeInitiatedEvt=" + (creatorNode != null ? creatorNode : 
evtData.sndNodeId) +
-                    ", nodeId=" + msg0.nodeId + ']');
+                try {
+                    fut.get();
+                }
+                catch (Exception e) {
+                    U.warn(log, "Previous communication error process future 
failed: " + e);
+                }
             }
+        }
+
+        assert !fut.isDone() : fut;
 
-            ZookeeperClusterNode node = rtState.top.nodesById.get(msg0.nodeId);
+        final String futPath = zkPaths.distributedFutureBasePath(msg.id);
+        final ZkCommunicationErrorProcessFuture fut0 = fut;
+        final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
+
+        if (rtState.crd) {
+            ZkCollectDistributedFuture nodeResFut = new 
ZkCollectDistributedFuture(this, rtState, futPath,
+                new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        // Future is completed from ZK event thread.
+                        finishCommunicationResolveProcess(rtState);
 
-            assert node != null : msg0.nodeId;
+                        return null;
+                    }
+                }
+            );
 
-            processNodeFail(node.internalId(), evtData.topologyVersion());
+            fut.nodeResultCollectFuture(nodeResFut);
         }
+
+        runInWorkerThread(new ZkRunnable(rtState, this) {
+            @Override protected void run0() throws Exception {
+                fut0.pingNodesAndNotifyFuture(locNode.order(), rtState, 
futPath, topSnapshot);
+            }
+        });
+    }
+
+    /**
+     * @param rtState Runtime state.
+     * @throws Exception If failed.
+     */
+    void finishCommunicationResolveProcess(ZkRuntimeState rtState) throws 
Exception {
+        ZkDiscoveryEventsData evtsData = rtState.evtsData;
+
+        UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
+
+        assert futId != null;
+
+        rtState.evtsData.communicationErrorResolveFutureId(null);
+
+        ZkCommunicationErrorResolveFinishMessage msg = new 
ZkCommunicationErrorResolveFinishMessage(futId);
+
+        ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
+            evtsData.evtIdGen,
+            evtsData.topVer,
+            locNode.id(),
+            null,
+            false);
+
+        evtData.msg = msg;
+
+        evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
+
+        saveAndProcessNewEvents();
     }
 
     /**
@@ -2041,7 +2219,7 @@ public class ZookeeperDiscoveryImpl {
             ZkCommunicationErrorProcessFuture commErrFut = 
commErrProcFut.get();
 
             if (commErrFut != null)
-                commErrFut.onNodeFailed(failedNode.id());
+                commErrFut.onNodeFailed(failedNode);
 
             final List<ClusterNode> topSnapshot = 
rtState.top.topologySnapshot();
 
@@ -2301,8 +2479,6 @@ public class ZookeeperDiscoveryImpl {
             connState = ConnectionState.STOPPED;
         }
 
-        ZookeeperClient zkClient = rtState.zkClient;
-
         rtState.onCloseStart();
 
         busyLock.block();
@@ -2311,6 +2487,8 @@ public class ZookeeperDiscoveryImpl {
 
         joinFut.onDone(e);
 
+        ZookeeperClient zkClient = rtState.zkClient;
+
         if (zkClient != null)
             zkClient.close();
 
@@ -2321,7 +2499,7 @@ public class ZookeeperDiscoveryImpl {
      * @param busyLock Busy lock.
      * @param err Error.
      */
-    private void onFatalError(GridSpinBusyLock busyLock, Throwable err) {
+    void onFatalError(GridSpinBusyLock busyLock, Throwable err) {
         busyLock.leaveBusy();
 
         if (err instanceof ZookeeperClientFailedException)
@@ -2444,111 +2622,12 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    abstract class ZkCallabck {
-        /** */
-        final ZkRuntimeState rtState;
-
-        /**
-         * @param rtState Runtime state.
-         */
-        ZkCallabck(ZkRuntimeState rtState) {
-            this.rtState = rtState;
-        }
-
-        /**
-         * @return {@code True} if is able to start processing.
-         */
-        final boolean onProcessStart() {
-            return !rtState.closing && busyLock.enterBusy();
-        }
-
-        /**
-         *
-         */
-        final void onProcessEnd() {
-            busyLock.leaveBusy();
-        }
-
-        /**
-         * @param e Error.
-         */
-        final void onProcessError(Throwable e) {
-            onFatalError(busyLock, e);
-        }
-    }
-
-    /**
-     *
-     */
-    abstract class AbstractWatcher extends ZkCallabck implements Watcher {
-        /**
-         * @param rtState Runtime state.
-         */
-        AbstractWatcher(ZkRuntimeState rtState) {
-            super(rtState);
-        }
-
-        /** {@inheritDoc} */
-        @Override public final void process(WatchedEvent evt) {
-            if (!onProcessStart())
-                return;
-
-            try {
-                process0(evt);
-
-                onProcessEnd();
-            }
-            catch (Throwable e) {
-                onProcessError(e);
-            }
-        }
-
-        /**
-         * @param evt Event.
-         * @throws Exception If failed.
-         */
-        protected abstract void process0(WatchedEvent evt) throws Exception;
-    }
-
-    /**
-     *
-     */
-    abstract class AbstractChildrenCallback extends ZkCallabck implements 
AsyncCallback.Children2Callback {
-        /**
-         * @param rtState Runtime state.
-         */
-        AbstractChildrenCallback(ZkRuntimeState rtState) {
-            super(rtState);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
-            if (!onProcessStart())
-                return;
-
-            try {
-                processResult0(rc, path, ctx, children, stat);
-
-                onProcessEnd();
-            }
-            catch (Throwable e) {
-                onProcessError(e);
-            }
-        }
-
-        abstract void processResult0(int rc, String path, Object ctx, 
List<String> children, Stat stat)
-            throws Exception;
-    }
-
-    /**
-     *
-     */
-    private class ZkWatcher extends AbstractWatcher implements 
ZkRuntimeState.ZkWatcher {
+    private class ZkWatcher extends ZkAbstractWatcher implements 
ZkRuntimeState.ZkWatcher {
         /**
          * @param rtState Runtime state.
          */
         ZkWatcher(ZkRuntimeState rtState) {
-            super(rtState);
+            super(rtState, ZookeeperDiscoveryImpl.this);
         }
 
         /** {@inheritDoc} */
@@ -2619,12 +2698,12 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    private class AliveNodeDataWatcher extends AbstractWatcher implements 
ZkRuntimeState.ZkAliveNodeDataWatcher {
+    private class AliveNodeDataWatcher extends ZkAbstractWatcher implements 
ZkRuntimeState.ZkAliveNodeDataWatcher {
         /**
          * @param rtState Runtime state.
          */
         AliveNodeDataWatcher(ZkRuntimeState rtState) {
-            super(rtState);
+            super(rtState, ZookeeperDiscoveryImpl.this);
         }
 
         /** {@inheritDoc} */
@@ -2691,12 +2770,12 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    private class PreviousNodeWatcher extends AbstractWatcher implements 
AsyncCallback.StatCallback {
+    private class PreviousNodeWatcher extends ZkAbstractWatcher implements 
AsyncCallback.StatCallback {
         /**
          * @param rtState Runtime state.
          */
         PreviousNodeWatcher(ZkRuntimeState rtState) {
-            super(rtState);
+            super(rtState, ZookeeperDiscoveryImpl.this);
         }
 
         /** {@inheritDoc} */
@@ -2731,12 +2810,12 @@ public class ZookeeperDiscoveryImpl {
     /**
      *
      */
-    class CheckCoordinatorCallback extends AbstractChildrenCallback {
+    class CheckCoordinatorCallback extends ZkAbstractChildrenCallback {
         /**
          * @param rtState Runtime state.
          */
         CheckCoordinatorCallback(ZkRuntimeState rtState) {
-            super(rtState);
+            super(rtState, ZookeeperDiscoveryImpl.this);
         }
 
         /** {@inheritDoc} */

Reply via email to