http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
new file mode 100644
index 0000000..5645791
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDistributedCollectDataFuture.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ *
+ */
+class ZkDistributedCollectDataFuture extends GridFutureAdapter<Void> {
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final String futPath;
+
+    /** */
+    private final Set<Long> remainingNodes;
+
+    /** */
+    private final Callable<Void> lsnr;
+
+    /**
+     * @param impl Disovery impl
+     * @param rtState Runtime state.
+     * @param futPath Future path.
+     * @param lsnr Future listener.
+     * @throws Exception If listener call failed.
+     */
+    ZkDistributedCollectDataFuture(
+        ZookeeperDiscoveryImpl impl,
+        ZkRuntimeState rtState,
+        String futPath,
+        Callable<Void> lsnr)
+        throws Exception
+    {
+        this.log = impl.log();
+        this.futPath = futPath;
+        this.lsnr = lsnr;
+
+        ZkClusterNodes top = rtState.top;
+
+        // Assume new nodes can not join while future is in progress.
+
+        remainingNodes = U.newHashSet(top.nodesByOrder.size());
+
+        for (ZookeeperClusterNode node : top.nodesByInternalId.values())
+            remainingNodes.add(node.order());
+
+        NodeResultsWatcher watcher = new NodeResultsWatcher(rtState, impl);
+
+        if (remainingNodes.isEmpty())
+            completeAndNotifyListener();
+        else {
+            if (log.isInfoEnabled()) {
+                log.info("Initialize data collect future [futPath=" + futPath 
+ ", " +
+                    "remainingNodes=" + remainingNodes.size() + ']');
+            }
+
+            rtState.zkClient.getChildrenAsync(futPath, watcher, watcher);
+        }
+    }
+
+    /**
+     * @throws Exception If listener call failed.
+     */
+    private void completeAndNotifyListener() throws Exception {
+        if (super.onDone())
+            lsnr.call();
+    }
+
+    /**
+     * @param futPath
+     * @param client
+     * @param nodeOrder
+     * @param data
+     * @throws Exception If failed.
+     */
+    static void saveNodeResult(String futPath, ZookeeperClient client, long 
nodeOrder, byte[] data) throws Exception {
+        client.createIfNeeded(futPath + "/" + nodeOrder, data, 
CreateMode.PERSISTENT);
+    }
+
+    /**
+     * @param futPath
+     * @param client
+     * @param nodeOrder
+     * @return Node result data.
+     * @throws Exception If fai.ed
+     */
+    static byte[] readNodeResult(String futPath, ZookeeperClient client, long 
nodeOrder) throws Exception {
+        return client.getData(futPath + "/" + nodeOrder);
+    }
+
+    /**
+     * @param futResPath Result path.
+     * @param client Client.
+     * @param data Result data.
+     * @throws Exception If failed.
+     */
+    static void saveResult(String futResPath, ZookeeperClient client, byte[] 
data) throws Exception {
+        client.createIfNeeded(futResPath, data, CreateMode.PERSISTENT);
+    }
+
+    static byte[] readResult(ZookeeperClient client, ZkIgnitePaths paths, UUID 
futId) throws Exception {
+        return client.getData(paths.distributedFutureResultPath(futId));
+    }
+
+    /**
+     * @param client Client.
+     * @param paths Paths utils.
+     * @param futId Future ID.
+     * @throws Exception If failed.
+     */
+    static void deleteFutureData(ZookeeperClient client, ZkIgnitePaths paths, 
UUID futId) throws Exception {
+        // TODO ZK: use multi, better batching + max-size safe + 
NoNodeException safe.
+        String evtDir = paths.distributedFutureBasePath(futId);
+
+        try {
+            client.deleteAll(evtDir,
+                client.getChildren(evtDir),
+                -1);
+        }
+        catch (KeeperException.NoNodeException e) {
+            // TODO ZK
+        }
+
+        client.deleteIfExists(evtDir, -1);
+
+        client.deleteIfExists(paths.distributedFutureResultPath(futId), -1);
+    }
+
+    /**
+     * @param top Current topology.
+     * @throws Exception If listener call failed.
+     */
+    void onTopologyChange(ZkClusterNodes top) throws Exception {
+        if (remainingNodes.isEmpty())
+            return;
+
+        for (Iterator<Long> it = remainingNodes.iterator(); it.hasNext();) {
+            Long nodeOrder = it.next();
+
+            if (!top.nodesByOrder.containsKey(nodeOrder)) {
+                it.remove();
+
+                int remaining = remainingNodes.size();
+
+                if (log.isInfoEnabled()) {
+                    log.info("ZkDistributedCollectDataFuture removed remaining 
failed node [node=" + nodeOrder +
+                        ", remaining=" + remaining +
+                        ", futPath=" + futPath + ']');
+                }
+
+                if (remaining == 0) {
+                    completeAndNotifyListener();
+
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    class NodeResultsWatcher extends ZkAbstractWatcher implements 
AsyncCallback.Children2Callback {
+        /**
+         * @param rtState Runtime state.
+         * @param impl Discovery impl.
+         */
+        NodeResultsWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl 
impl) {
+            super(rtState, impl);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void process0(WatchedEvent evt) {
+            if (evt.getType() == Watcher.Event.EventType.NodeChildrenChanged)
+                rtState.zkClient.getChildrenAsync(evt.getPath(), this, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+            if (!onProcessStart())
+                return;
+
+            try {
+                if (!isDone()) {
+                    assert rc == 0 : KeeperException.Code.get(rc);
+
+                    for (int i = 0; i < children.size(); i++) {
+                        Long nodeOrder = Long.parseLong(children.get(i));
+
+                        if (remainingNodes.remove(nodeOrder)) {
+                            int remaining = remainingNodes.size();
+
+                            if (log.isInfoEnabled()) {
+                                log.info("ZkDistributedCollectDataFuture added 
new result [node=" + nodeOrder +
+                                    ", remaining=" + remaining +
+                                    ", futPath=" + path + ']');
+                            }
+
+                            if (remaining == 0)
+                                completeAndNotifyListener();
+                        }
+                    }
+                }
+
+                onProcessEnd();
+            }
+            catch (Throwable e) {
+                onProcessError(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
new file mode 100644
index 0000000..9d8980e
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, 
ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final long nodeInternalId;
+
+    /** */
+    final String warning;
+
+    /**
+     * @param nodeInternalId Node ID.
+     * @param warning Warning to be displayed on all nodes.
+     */
+    ZkForceNodeFailMessage(long nodeInternalId, String warning) {
+        this.nodeInternalId = nodeInternalId;
+        this.warning = warning;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkForceNodeFailMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
new file mode 100644
index 0000000..642183b
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+class ZkIgnitePaths {
+    /** */
+    static final String PATH_SEPARATOR = "/";
+
+    /** */
+    private static final byte CLIENT_NODE_FLAG_MASK = 0x01;
+
+    /** */
+    private static final int UUID_LEN = 36;
+
+    /** Directory to store joined node data. */
+    private static final String JOIN_DATA_DIR = "jd";
+
+    /** Directory to store new custom events. */
+    private static final String CUSTOM_EVTS_DIR = "ce";
+
+    /** Directory to store parts of multi-parts custom events. */
+    private static final String CUSTOM_EVTS_PARTS_DIR = "cp";
+
+    /** Directory to store acknowledge messages for custom events. */
+    private static final String CUSTOM_EVTS_ACKS_DIR = "ca";
+
+    /** Directory to store EPHEMERAL znodes for alive cluster nodes. */
+    static final String ALIVE_NODES_DIR = "n";
+
+    /** Path to store discovery events {@link ZkDiscoveryEventsData}. */
+    private static final String DISCO_EVENTS_PATH = "e";
+
+    /** */
+    final String clusterDir;
+
+    /** */
+    final String aliveNodesDir;
+
+    /** */
+    final String joinDataDir;
+
+    /** */
+    final String evtsPath;
+
+    /** */
+    final String customEvtsDir;
+
+    /** */
+    final String customEvtsPartsDir;
+
+    /** */
+    final String customEvtsAcksDir;
+
+    /**
+     * @param zkRootPath Base Zookeeper directory for all Ignite nodes.
+     */
+    ZkIgnitePaths(String zkRootPath) {
+        clusterDir = zkRootPath;
+
+        aliveNodesDir = zkPath(ALIVE_NODES_DIR);
+        joinDataDir = zkPath(JOIN_DATA_DIR);
+        evtsPath = zkPath(DISCO_EVENTS_PATH);
+        customEvtsDir = zkPath(CUSTOM_EVTS_DIR);
+        customEvtsPartsDir = zkPath(CUSTOM_EVTS_PARTS_DIR);
+        customEvtsAcksDir = zkPath(CUSTOM_EVTS_ACKS_DIR);
+    }
+
+    /**
+     * TODO ZK: copied from curator.
+     *
+     * Validate the provided znode path string
+     * @param path znode path string
+     * @return The given path if it was valid, for fluent chaining
+     * @throws IllegalArgumentException if the path is invalid
+     */
+    static String validatePath(String path) throws IllegalArgumentException {
+        if (path == null) {
+            throw new IllegalArgumentException("Path cannot be null");
+        }
+        if (path.length() == 0) {
+            throw new IllegalArgumentException("Path length must be > 0");
+        }
+        if (path.charAt(0) != '/') {
+            throw new IllegalArgumentException(
+                "Path must start with / character");
+        }
+        if (path.length() == 1) { // done checking - it's the root
+            return path;
+        }
+        if (path.charAt(path.length() - 1) == '/') {
+            throw new IllegalArgumentException(
+                "Path must not end with / character");
+        }
+
+        String reason = null;
+        char lastc = '/';
+        char chars[] = path.toCharArray();
+        char c;
+        for (int i = 1; i < chars.length; lastc = chars[i], i++) {
+            c = chars[i];
+
+            if (c == 0) {
+                reason = "null character not allowed @" + i;
+                break;
+            } else if (c == '/' && lastc == '/') {
+                reason = "empty node name specified @" + i;
+                break;
+            } else if (c == '.' && lastc == '.') {
+                if (chars[i-2] == '/' &&
+                    ((i + 1 == chars.length)
+                        || chars[i+1] == '/')) {
+                    reason = "relative paths not allowed @" + i;
+                    break;
+                }
+            } else if (c == '.') {
+                if (chars[i-1] == '/' &&
+                    ((i + 1 == chars.length)
+                        || chars[i+1] == '/')) {
+                    reason = "relative paths not allowed @" + i;
+                    break;
+                }
+            } else if (c > '\u0000' && c < '\u001f'
+                || c > '\u007f' && c < '\u009F'
+                || c > '\ud800' && c < '\uf8ff'
+                || c > '\ufff0' && c < '\uffff') {
+                reason = "invalid charater @" + i;
+                break;
+            }
+        }
+
+        if (reason != null) {
+            throw new IllegalArgumentException(
+                "Invalid path string \"" + path + "\" caused by " + reason);
+        }
+
+        return path;
+    }
+
+    /**
+     * @param path Relative path.
+     * @return Full path.
+     */
+    private String zkPath(String path) {
+        return clusterDir + "/" + path;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param prefixId Unique prefix ID.
+     * @return Path.
+     */
+    String joiningNodeDataPath(UUID nodeId, UUID prefixId) {
+        return joinDataDir + '/' + prefixId + ":" + nodeId.toString();
+    }
+
+    /**
+     * @param path Alive node zk path.
+     * @return Node internal ID.
+     */
+    static long aliveInternalId(String path) {
+        int idx = path.lastIndexOf('|');
+
+        return Integer.parseInt(path.substring(idx + 1));
+    }
+
+    /**
+     * @param prefix Node unique path prefix.
+     * @param node Node.
+     * @return Path.
+     */
+    String aliveNodePathForCreate(String prefix, ZookeeperClusterNode node) {
+        byte flags = 0;
+
+        if (node.isClient())
+            flags |= CLIENT_NODE_FLAG_MASK;
+
+        return aliveNodesDir + "/" + prefix + ":" + node.id() + ":" + 
encodeFlags(flags) + "|";
+    }
+
+    /**
+     * @param path Alive node zk path.
+     * @return {@code True} if node is client.
+     */
+    static boolean aliveNodeClientFlag(String path) {
+        return (aliveFlags(path) & CLIENT_NODE_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @param path Alive node zk path.
+     * @return Node ID.
+     */
+    static UUID aliveNodePrefixId(String path) {
+        return UUID.fromString(path.substring(0, ZkIgnitePaths.UUID_LEN));
+    }
+
+    /**
+     * @param path Alive node zk path.
+     * @return Node ID.
+     */
+    static UUID aliveNodeId(String path) {
+        // <uuid prefix>:<node id>:<flags>|<alive seq>
+        int startIdx = ZkIgnitePaths.UUID_LEN + 1;
+
+        String idStr = path.substring(startIdx, startIdx + 
ZkIgnitePaths.UUID_LEN);
+
+        return UUID.fromString(idStr);
+    }
+
+    /**
+     * @param path Event zk path.
+     * @return Event sequence number.
+     */
+    static int customEventSequence(String path) {
+        int idx = path.lastIndexOf('|');
+
+        return Integer.parseInt(path.substring(idx + 1));
+    }
+
+    /**
+     * @param path Custom event zl path.
+     * @return Event node ID.
+     */
+    static UUID customEventSendNodeId(String path) {
+        // <uuid prefix>:<node id>:<partCnt>|<seq>
+        int startIdx = ZkIgnitePaths.UUID_LEN + 1;
+
+        String idStr = path.substring(startIdx, startIdx + 
ZkIgnitePaths.UUID_LEN);
+
+        return UUID.fromString(idStr);
+    }
+
+    /**
+     * @param path Event path.
+     * @return Event unique prefix.
+     */
+    static String customEventPrefix(String path) {
+        // <uuid prefix>:<node id>:<partCnt>|<seq>
+
+        return path.substring(0, ZkIgnitePaths.UUID_LEN);
+    }
+
+    /**
+     * @param path Custom event zl path.
+     * @return Event node ID.
+     */
+    static int customEventPartsCount(String path) {
+        // <uuid prefix>:<node id>:<partCnt>|<seq>
+        int startIdx = 2 * ZkIgnitePaths.UUID_LEN + 2;
+
+        String cntStr = path.substring(startIdx, startIdx + 4);
+
+        int partCnt = Integer.parseInt(cntStr);
+
+        assert partCnt >= 1 : partCnt;
+
+        return partCnt;
+    }
+
+    /**
+     * @param prefix Prefix.
+     * @param nodeId Node ID.
+     * @param partCnt Parts count.
+     * @return Path.
+     */
+    String createCustomEventPath(String prefix, UUID nodeId, int partCnt) {
+        return customEvtsDir + "/" + prefix + ":" + nodeId + ":" + 
String.format("%04d", partCnt) + '|';
+    }
+
+    /**
+     * @param prefix Prefix.
+     * @param nodeId Node ID.
+     * @return Path.
+     */
+    String customEventPartsBasePath(String prefix, UUID nodeId) {
+        return customEvtsPartsDir + "/" + prefix + ":" + nodeId + ":";
+    }
+
+    /**
+     * @param prefix Prefix.
+     * @param nodeId Node ID.
+     * @param part Part number.
+     * @return Path.
+     */
+    String customEventPartPath(String prefix, UUID nodeId, int part) {
+        return customEventPartsBasePath(prefix, nodeId) + 
String.format("%04d", part);
+    }
+
+    /**
+     * @param evtId Event ID.
+     * @return Event zk path.
+     */
+    String joinEventDataPathForJoined(long evtId) {
+        return evtsPath + "/fj-" + evtId;
+    }
+
+    /**
+     * @param topVer Event topology version.
+     * @return Event zk path.
+     */
+    String joinEventSecuritySubjectPath(long topVer) {
+        return evtsPath + "/s-" + topVer;
+    }
+
+    /**
+     * @param origEvtId ID of original custom event.
+     * @return Path for custom event ack.
+     */
+    String ackEventDataPath(long origEvtId) {
+        assert origEvtId != 0;
+
+        return customEvtsAcksDir + "/" + String.valueOf(origEvtId);
+    }
+
+    /**
+     * @param id Future ID.
+     * @return Future path.
+     */
+    String distributedFutureBasePath(UUID id) {
+        return evtsPath + "/f-" + id;
+    }
+
+    /**
+     * @param id Future ID.
+     * @return Future path.
+     */
+    String distributedFutureResultPath(UUID id) {
+        return evtsPath + "/fr-" + id;
+    }
+
+    /**
+     * @param flags Flags.
+     * @return Flags string.
+     */
+    private static String encodeFlags(byte flags) {
+        int intVal = flags + 128;
+
+        String str = Integer.toString(intVal, 16);
+
+        if (str.length() == 1)
+            str = '0' + str;
+
+        assert str.length() == 2  : str;
+
+        return str;
+    }
+
+    /**
+     * @param path Alive node zk path.
+     * @return Flags.
+     */
+    private static byte aliveFlags(String path) {
+        int startIdx = path.lastIndexOf(':') + 1;
+
+        String flagsStr = path.substring(startIdx, startIdx + 2);
+
+        return (byte)(Integer.parseInt(flagsStr, 16) - 128);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
new file mode 100644
index 0000000..a73312c
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+class ZkInternalJoinErrorMessage implements ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    transient boolean notifyNode = true;
+
+    /** */
+    final long nodeInternalId;
+
+    /** */
+    final String err;
+
+    /**
+     * @param nodeInternalId Joining node internal ID.
+     * @param err Error message.
+     */
+    ZkInternalJoinErrorMessage(long nodeInternalId, String err) {
+        this.nodeInternalId = nodeInternalId;
+        this.err = err;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
new file mode 100644
index 0000000..c1d56f0
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+interface ZkInternalMessage extends Serializable {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
new file mode 100644
index 0000000..e4ae4ba0
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkJoinEventDataForJoined implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final List<ZookeeperClusterNode> top;
+
+    /** */
+    private final Map<Long, byte[]> discoData;
+
+    /** */
+    private final Map<Long, Long> dupDiscoData;
+
+    /**
+     * @param top Topology.
+     * @param discoData Discovery data.
+     */
+    ZkJoinEventDataForJoined(List<ZookeeperClusterNode> top, Map<Long, byte[]> 
discoData, @Nullable Map<Long, Long> dupDiscoData) {
+        assert top != null;
+        assert discoData != null && !discoData.isEmpty();
+
+        this.top = top;
+        this.discoData = discoData;
+        this.dupDiscoData = dupDiscoData;
+    }
+
+    byte[] discoveryDataForNode(long nodeOrder) {
+        assert discoData != null;
+
+        byte[] dataBytes = discoData.get(nodeOrder);
+
+        if (dataBytes != null)
+            return dataBytes;
+
+        assert dupDiscoData != null;
+
+        Long dupDataNode = dupDiscoData.get(nodeOrder);
+
+        assert dupDataNode != null;
+
+        dataBytes = discoData.get(dupDataNode);
+
+        assert dataBytes != null;
+
+        return dataBytes;
+    }
+
+    /**
+     * @return Current topology.
+     */
+    List<ZookeeperClusterNode> topology() {
+        assert top != null;
+
+        return top;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
new file mode 100644
index 0000000..8149afc
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinedNodeEvtData.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class ZkJoinedNodeEvtData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final long topVer;
+
+    /** */
+    final long joinedInternalId;
+
+    /** */
+    final UUID nodeId;
+
+    /** */
+    final int joinDataPartCnt;
+
+    /** */
+    final int secSubjPartCnt;
+
+    /** */
+    final UUID joinDataPrefixId;
+
+    /** */
+    transient ZkJoiningNodeData joiningNodeData;
+
+    /**
+     * @param topVer Topology version for node join event.
+     * @param nodeId Joined node ID.
+     * @param joinedInternalId Joined node internal ID.
+     * @param joinDataPrefixId Join data unique prefix.
+     * @param joinDataPartCnt Join data part count.
+     * @param secSubjPartCnt Security subject part count.
+     */
+    ZkJoinedNodeEvtData(
+        long topVer,
+        UUID nodeId,
+        long joinedInternalId,
+        UUID joinDataPrefixId,
+        int joinDataPartCnt,
+        int secSubjPartCnt)
+    {
+        this.topVer = topVer;
+        this.nodeId = nodeId;
+        this.joinedInternalId = joinedInternalId;
+        this.joinDataPrefixId = joinDataPrefixId;
+        this.joinDataPartCnt = joinDataPartCnt;
+        this.secSubjPartCnt = secSubjPartCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ZkJoinedNodeData [id=" + nodeId + ", order=" + topVer + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
new file mode 100644
index 0000000..ff8311d
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+class ZkJoiningNodeData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private int partCnt;
+
+    /** */
+    @GridToStringInclude
+    private ZookeeperClusterNode node;
+
+    /** */
+    @GridToStringInclude
+    private Map<Integer, Serializable> discoData;
+
+    /**
+     * @param partCnt Number of parts in multi-parts message.
+     */
+    ZkJoiningNodeData(int partCnt) {
+        this.partCnt = partCnt;
+    }
+
+    /**
+     * @param node Node.
+     * @param discoData Discovery data.
+     */
+    ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, Serializable> 
discoData) {
+        assert node != null && node.id() != null : node;
+        assert discoData != null;
+
+        this.node = node;
+        this.discoData = discoData;
+    }
+
+    /**
+     * @return Number of parts in multi-parts message.
+     */
+    int partCount() {
+        return partCnt;
+    }
+
+    /**
+     * @return Node.
+     */
+    ZookeeperClusterNode node() {
+        return node;
+    }
+
+    /**
+     * @return Discovery data.
+     */
+    Map<Integer, Serializable> discoveryData() {
+        return discoData;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkJoiningNodeData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
new file mode 100644
index 0000000..626fe74
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkNoServersMessage implements DiscoverySpiCustomMessage, 
ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkNoServersMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
new file mode 100644
index 0000000..2abfee3
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNodeValidateResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+class ZkNodeValidateResult {
+    /** */
+    String err;
+
+    /** */
+    byte[] secSubjZipBytes;
+
+    /**
+     * @param err Error.
+     */
+    ZkNodeValidateResult(String err) {
+        this.err = err;
+    }
+
+    /**
+     * @param secSubjZipBytes Marshalled security subject.
+     */
+    ZkNodeValidateResult(byte[] secSubjZipBytes) {
+        this.secSubjZipBytes = secSubjZipBytes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
new file mode 100644
index 0000000..fb6cf89
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRunnable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+public abstract class ZkRunnable extends ZkAbstractCallabck implements 
Runnable {
+    /**
+     * @param rtState Runtime state.
+     * @param impl Discovery impl.
+     */
+    ZkRunnable(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+        super(rtState, impl);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() {
+        if (!onProcessStart())
+            return;
+
+        try {
+            run0();
+
+            onProcessEnd();
+        }
+        catch (Throwable e) {
+            onProcessError(e);
+        }
+    }
+
+    /**
+     *
+     */
+    protected abstract void run0() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
new file mode 100644
index 0000000..6792154
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.List;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.Watcher;
+
+/**
+ *
+ */
+class ZkRuntimeState {
+    /** */
+    ZkWatcher watcher;
+
+    /** */
+    ZkAliveNodeDataWatcher aliveNodeDataWatcher;
+
+    /** */
+    volatile Exception errForClose;
+
+    /** */
+    final boolean prevJoined;
+
+    /** */
+    ZookeeperClient zkClient;
+
+    /** */
+    long internalOrder;
+
+    /** */
+    int joinDataPartCnt;
+
+    /** */
+    long gridStartTime;
+
+    /** */
+    volatile boolean joined;
+
+    /** */
+    ZkDiscoveryEventsData evtsData;
+
+    /** */
+    boolean crd;
+
+    /** */
+    String locNodeZkPath;
+
+    /** */
+    final ZkAliveNodeData locNodeInfo = new ZkAliveNodeData();
+
+    /** */
+    int procEvtCnt;
+
+    /** */
+    final ZkClusterNodes top = new ZkClusterNodes();
+
+    /** */
+    List<ClusterNode> commErrProcNodes;
+
+    /** Timeout callback registering watcher for join error
+     * (set this watcher after timeout as a minor optimization).
+     */
+    ZkTimeoutObject joinErrTo;
+
+    /** Timeout callback set to wait for join timeout. */
+    ZkTimeoutObject joinTo;
+
+    /** Timeout callback to update processed events counter. */
+    ZkTimeoutObject procEvtsUpdateTo;
+
+    /**
+     * @param prevJoined {@code True} if joined topology before reconnect 
attempt.
+     */
+    ZkRuntimeState(boolean prevJoined) {
+        this.prevJoined = prevJoined;
+    }
+
+    /**
+     * @param watcher Watcher.
+     * @param aliveNodeDataWatcher Alive nodes data watcher.
+     */
+    void init(ZkWatcher watcher, ZkAliveNodeDataWatcher aliveNodeDataWatcher) {
+        this.watcher = watcher;
+        this.aliveNodeDataWatcher = aliveNodeDataWatcher;
+    }
+
+    /**
+     * @param err Error.
+     */
+    void onCloseStart(Exception err) {
+        assert err != null;
+
+        errForClose = err;
+
+        ZookeeperClient zkClient = this.zkClient;
+
+        if (zkClient != null)
+            zkClient.onCloseStart();
+    }
+
+    /**
+     *
+     */
+    interface ZkWatcher extends Watcher, AsyncCallback.Children2Callback, 
AsyncCallback.DataCallback {
+        // No-op.
+    }
+
+    /**
+     *
+     */
+    interface ZkAliveNodeDataWatcher extends Watcher, 
AsyncCallback.DataCallback {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java
new file mode 100644
index 0000000..4d3d5b4
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkTimeoutObject.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+
+/**
+ *
+ */
+abstract class ZkTimeoutObject implements IgniteSpiTimeoutObject {
+    /** */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** */
+    private final long endTime;
+
+    /** */
+    volatile boolean cancelled;
+
+    /**
+     * @param timeout Timeout.
+     */
+    ZkTimeoutObject(long timeout) {
+        long endTime = timeout >= 0 ? System.currentTimeMillis() + timeout : 
Long.MAX_VALUE;
+
+        this.endTime = endTime >= 0 ? endTime : Long.MAX_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final IgniteUuid id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final long endTime() {
+        return endTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
new file mode 100644
index 0000000..786d997
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -0,0 +1,1196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * TODO ZK: limit reconnect attempts.
+ */
+public class ZookeeperClient implements Watcher {
+    /** */
+    private static final long RETRY_TIMEOUT =
+        
IgniteSystemProperties.getLong("IGNITE_ZOOKEEPER_DISCOVERY_RETRY_TIMEOUT", 
1000);
+
+    /** */
+    private static final int MAX_REQ_SIZE = 1048528;
+
+    /** */
+    private static final List<ACL> ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+    /** */
+    private static final byte[] EMPTY_BYTES = {};
+
+    /** */
+    private final ZooKeeper zk;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private ConnectionState state = ConnectionState.Disconnected;
+
+    /** */
+    private long connLossTimeout;
+
+    /** */
+    private volatile long connStartTime;
+
+    /** */
+    private final Object stateMux = new Object();
+
+    /** */
+    private final IgniteRunnable connLostC;
+
+    /** */
+    private final Timer connTimer;
+
+    /** */
+    private final ArrayDeque<ZkAsyncOperation> retryQ = new ArrayDeque<>();
+
+    /** */
+    private volatile boolean closing;
+
+    /**
+     * @param log Logger.
+     * @param connectString ZK connection string.
+     * @param sesTimeout ZK session timeout.
+     * @param connLostC Lost connection callback.
+     * @throws Exception If failed.
+     */
+    ZookeeperClient(IgniteLogger log, String connectString, int sesTimeout, 
IgniteRunnable connLostC) throws Exception {
+        this(null, log, connectString, sesTimeout, connLostC);
+    }
+
+    /**
+     * @param igniteInstanceName Ignite instance name.
+     * @param log Logger.
+     * @param connectString ZK connection string.
+     * @param sesTimeout ZK session timeout.
+     * @param connLostC Lost connection callback.
+     * @throws Exception If failed.
+     */
+    ZookeeperClient(String igniteInstanceName,
+        IgniteLogger log,
+        String connectString,
+        int sesTimeout,
+        IgniteRunnable connLostC)
+        throws Exception
+    {
+        this.log = log.getLogger(getClass());
+        this.connLostC = connLostC;
+
+        connLossTimeout = sesTimeout;
+
+        long connStartTime = this.connStartTime = System.currentTimeMillis();
+
+        connTimer = new Timer("zk-client-timer-" + igniteInstanceName);
+
+        String threadName = Thread.currentThread().getName();
+
+        // ZK generates internal threads' names using current thread name.
+        Thread.currentThread().setName("zk-" + igniteInstanceName);
+
+        try {
+            zk = new ZooKeeper(connectString, sesTimeout, this);
+        }
+        finally {
+            Thread.currentThread().setName(threadName);
+        }
+
+        synchronized (stateMux) {
+            if (connStartTime == this.connStartTime && state == 
ConnectionState.Disconnected)
+                scheduleConnectionCheck();
+        }
+    }
+
+    /**
+     * @return Zookeeper client.
+     */
+    ZooKeeper zk() {
+        return zk;
+    }
+
+    /**
+     * @return {@code True} if connected to ZooKeeper.
+     */
+    boolean connected() {
+        synchronized (stateMux) {
+            return state == ConnectionState.Connected;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void process(WatchedEvent evt) {
+        if (closing)
+            return;
+
+        if (evt.getType() == Event.EventType.None) {
+            ConnectionState newState;
+
+            synchronized (stateMux) {
+                if (state == ConnectionState.Lost) {
+                    U.warn(log, "Received event after connection was lost 
[evtState=" + evt.getState() + "]");
+
+                    return;
+                }
+
+                if (!zk.getState().isAlive())
+                    return;
+
+                Event.KeeperState zkState = evt.getState();
+
+                switch (zkState) {
+                    case SaslAuthenticated:
+                        return; // No-op.
+
+                    case AuthFailed:
+                        newState = state;
+
+                        break;
+
+                    case Disconnected:
+                        newState = ConnectionState.Disconnected;
+
+                        break;
+
+                    case SyncConnected:
+                        newState = ConnectionState.Connected;
+
+                        break;
+
+                    case Expired:
+                        newState = ConnectionState.Lost;
+
+                        break;
+
+                    default:
+                        U.error(log, "Unexpected state for ZooKeeper client, 
close connection: " + zkState);
+
+                        newState = ConnectionState.Lost;
+                }
+
+                if (newState != state) {
+                    if (log.isInfoEnabled())
+                        log.info("ZooKeeper client state changed [prevState=" 
+ state + ", newState=" + newState + ']');
+
+                    state = newState;
+
+                    if (newState == ConnectionState.Disconnected) {
+                        connStartTime = System.currentTimeMillis();
+
+                        scheduleConnectionCheck();
+                    }
+                    else if (newState == ConnectionState.Connected)
+                        stateMux.notifyAll();
+                    else
+                        assert state == ConnectionState.Lost : state;
+                }
+                else
+                    return;
+            }
+
+            if (newState == ConnectionState.Lost) {
+                closeClient();
+
+                notifyConnectionLost();
+            }
+            else if (newState == ConnectionState.Connected) {
+                for (ZkAsyncOperation op : retryQ)
+                    op.execute();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private void notifyConnectionLost() {
+        if (!closing && state == ConnectionState.Lost && connLostC != null)
+            connLostC.run();
+    }
+
+    /**
+     * @param path Path.
+     * @return {@code True} if node exists.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    boolean exists(String path) throws ZookeeperClientFailedException, 
InterruptedException {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                return zk.exists(path, false) != null;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     *
+     * @param paths Paths to create.
+     * @param createMode Create mode.
+     * @throws KeeperException.NodeExistsException If at least one of target 
node already exists.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    void createAll(List<String> paths, CreateMode createMode)
+        throws ZookeeperClientFailedException, InterruptedException, 
KeeperException.NodeExistsException
+    {
+        // TODO ZK: need check for max size?
+        List<Op> ops = new ArrayList<>(paths.size());
+
+        for (String path : paths)
+            ops.add(Op.create(path, EMPTY_BYTES, ZK_ACL, createMode));
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.multi(ops);
+
+                return;
+            }
+            catch (KeeperException.NodeExistsException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+
+    }
+
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param overhead Extra overhead.
+     * @return {@code True} If data size exceeds max request size and should 
be splitted into multiple parts.
+     */
+    boolean needSplitNodeData(String path, byte[] data, int overhead) {
+        return requestOverhead(path) + data.length + overhead > MAX_REQ_SIZE;
+    }
+
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param overhead Extra overhead.
+     * @return Splitted data.
+     */
+    List<byte[]> splitNodeData(String path, byte[] data, int overhead) {
+        int partSize = MAX_REQ_SIZE - requestOverhead(path) - overhead;
+
+        int partCnt = data.length / partSize;
+
+        if (data.length % partSize != 0)
+            partCnt++;
+
+        assert partCnt > 1 : "Do not need split";
+
+        List<byte[]> parts = new ArrayList<>(partCnt);
+
+        int remaining = data.length;
+
+        for (int i = 0; i < partCnt; i++) {
+            int partSize0 = Math.min(remaining, partSize);
+
+            byte[] part = new byte[partSize0];
+
+            System.arraycopy(data, i * partSize, part, 0, part.length);
+
+            remaining -= partSize0;
+
+            parts.add(part);
+        }
+
+        assert remaining == 0 : remaining;
+
+        return parts;
+    }
+
+    /**
+     * TODO ZK: it seems not always precise, e.g. if ACL is used?
+     * @param path Request path.
+     * @return Marshalled request overhead.
+     */
+    private int requestOverhead(String path) {
+        return path.length();
+    }
+
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param createMode Create mode.
+     * @return Created path.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    String createIfNeeded(String path, byte[] data, CreateMode createMode)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        assert !createMode.isSequential() : createMode;
+
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                return zk.create(path, data, ZK_ACL, createMode);
+            }
+            catch (KeeperException.NodeExistsException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Node already exists: " + path);
+
+                return path;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param checkPrefix Unique prefix to check in case of retry.
+     * @param parentPath Parent node path.
+     * @param path Node to create.
+     * @param data Node data.
+     * @param createMode Create mode.
+     * @return Create path.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    String createSequential(String checkPrefix, String parentPath, String 
path, byte[] data, CreateMode createMode)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        assert createMode.isSequential() : createMode;
+
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        boolean first = true;
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                if (!first) {
+                    List<String> children = zk.getChildren(parentPath, false);
+
+                    for (int i = 0; i < children.size(); i++) {
+                        String child = children.get(i);
+
+                        if (children.get(i).startsWith(checkPrefix)) {
+                            String resPath = parentPath + "/" + child;
+
+                            if (log.isDebugEnabled())
+                                log.debug("Check before retry, node already 
created: " + resPath);
+
+                            return resPath;
+                        }
+                    }
+                }
+
+                return zk.create(path, data, ZK_ACL, createMode);
+            }
+            catch (KeeperException.NodeExistsException e) {
+                assert !createMode.isSequential() : createMode;
+
+                if (log.isDebugEnabled())
+                    log.debug("Node already exists: " + path);
+
+                return path;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+
+            first = false;
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @return Children nodes.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    List<String> getChildren(String path)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                return zk.getChildren(path, false);
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @throws InterruptedException If interrupted.
+     * @throws KeeperException In case of error.
+     * @return {@code True} if given path exists.
+     */
+    boolean existsNoRetry(String path) throws InterruptedException, 
KeeperException {
+        return zk.exists(path, false) != null;
+    }
+
+    /**
+     * @param path Path.
+     * @param ver Expected version.
+     * @throws InterruptedException If interrupted.
+     * @throws KeeperException In case of error.
+     */
+    void deleteIfExistsNoRetry(String path, int ver) throws 
InterruptedException, KeeperException {
+        try {
+            zk.delete(path, ver);
+        }
+        catch (KeeperException.NoNodeException e) {
+            // No-op if znode does not exist.
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @param ver Version.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    void deleteIfExists(String path, int ver)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        try {
+            delete(path, ver);
+        }
+        catch (KeeperException.NoNodeException e) {
+            // No-op if znode does not exist.
+        }
+    }
+
+    /**
+     * @param parent Parent path.
+     * @param paths Children paths.
+     * @param ver Version.
+     * @throws KeeperException.NoNodeException If at least one of nodes does 
not exist.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    void deleteAll(@Nullable String parent, List<String> paths, int ver)
+        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
+    {
+        if (paths.isEmpty())
+            return;
+
+        // TODO ZK: need check for max size?
+        List<Op> ops = new ArrayList<>(paths.size());
+
+        for (String path : paths) {
+            String path0 = parent != null ? parent + "/" + path : path;
+
+            ops.add(Op.delete(path0, ver));
+        }
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.multi(ops);
+
+                return;
+            }
+            catch (KeeperException.NoNodeException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @param ver Version.
+     * @throws KeeperException.NoNodeException If target node does not exist.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    private void delete(String path, int ver)
+        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
+    {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.delete(path, ver);
+
+                return;
+            }
+            catch (KeeperException.NoNodeException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param ver Version.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     * @throws KeeperException.NoNodeException If node does not exist.
+     * @throws KeeperException.BadVersionException If version does not match.
+     */
+    void setData(String path, byte[] data, int ver)
+        throws ZookeeperClientFailedException, InterruptedException, 
KeeperException.NoNodeException,
+        KeeperException.BadVersionException
+    {
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.setData(path, data, ver);
+
+                return;
+            }
+            catch (KeeperException.BadVersionException | 
KeeperException.NoNodeException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @param stat Optional {@link Stat} instance to return znode state.
+     * @return Data.
+     * @throws KeeperException.NoNodeException If target node does not exist.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    byte[] getData(String path, @Nullable Stat stat)
+        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                return zk.getData(path, false, stat);
+            }
+            catch (KeeperException.NoNodeException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @return Data.
+     * @throws KeeperException.NoNodeException If target node does not exist.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    byte[] getData(String path)
+        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
+    {
+        return getData(path, null);
+    }
+
+    /**
+     * @param path Path.
+     */
+    void deleteIfExistsAsync(String path) {
+        new DeleteIfExistsOperation(path).execute();
+    }
+
+    /**
+     * @param path Path.
+     * @param watcher Watcher.
+     * @param cb Callback.
+     */
+    void existsAsync(String path, Watcher watcher, AsyncCallback.StatCallback 
cb) {
+        ExistsOperation op = new ExistsOperation(path, watcher, cb);
+
+        zk.exists(path, watcher, new StatCallbackWrapper(op), null);
+    }
+
+    /**
+     * @param path Path.
+     * @param watcher Watcher.
+     * @param cb Callback.
+     */
+    void getChildrenAsync(String path, Watcher watcher, 
AsyncCallback.Children2Callback cb) {
+        GetChildrenOperation op = new GetChildrenOperation(path, watcher, cb);
+
+        zk.getChildren(path, watcher, new ChildrenCallbackWrapper(op), null);
+    }
+
+    /**
+     * @param path Path.
+     * @param watcher Watcher.
+     * @param cb Callback.
+     */
+    void getDataAsync(String path, Watcher watcher, AsyncCallback.DataCallback 
cb) {
+        GetDataOperation op = new GetDataOperation(path, watcher, cb);
+
+        zk.getData(path, watcher, new DataCallbackWrapper(op), null);
+    }
+
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param createMode Create mode.
+     * @param cb Callback.
+     */
+    private void createAsync(String path, byte[] data, CreateMode createMode, 
AsyncCallback.StringCallback cb) {
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        CreateOperation op = new CreateOperation(path, data, createMode, cb);
+
+        zk.create(path, data, ZK_ACL, createMode, new 
CreateCallbackWrapper(op), null);
+    }
+
+    /**
+     *
+     */
+    void onCloseStart() {
+        closing = true;
+
+        synchronized (stateMux) {
+            stateMux.notifyAll();
+        }
+    }
+
+    /**
+     *
+     */
+    public void close() {
+        closeClient();
+    }
+
+    /**
+     * @param prevConnStartTime Time when connection was established.
+     * @param e Error.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    private void onZookeeperError(long prevConnStartTime, Exception e)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        ZookeeperClientFailedException err = null;
+
+        synchronized (stateMux) {
+            if (closing)
+                throw new ZookeeperClientFailedException("ZooKeeper client is 
closed.");
+
+            U.warn(log, "Failed to execute ZooKeeper operation [err=" + e + ", 
state=" + state + ']');
+
+            if (state == ConnectionState.Lost) {
+                U.error(log, "Operation failed with unexpected error, 
connection lost: " + e, e);
+
+                throw new ZookeeperClientFailedException(e);
+            }
+
+            boolean retry = (e instanceof KeeperException) && 
needRetry(((KeeperException)e).code().intValue());
+
+            if (retry) {
+                long remainingTime;
+
+                if (state == ConnectionState.Connected && connStartTime == 
prevConnStartTime) {
+                    state = ConnectionState.Disconnected;
+
+                    connStartTime = System.currentTimeMillis();
+
+                    remainingTime = connLossTimeout;
+                }
+                else {
+                    assert connStartTime != 0;
+
+                    assert state == ConnectionState.Disconnected : state;
+
+                    remainingTime = connLossTimeout - 
(System.currentTimeMillis() - connStartTime);
+
+                    if (remainingTime <= 0) {
+                        state = ConnectionState.Lost;
+
+                        U.warn(log, "Failed to establish ZooKeeper connection, 
close client " +
+                            "[timeout=" + connLossTimeout + ']');
+
+                        err = new ZookeeperClientFailedException(e);
+                    }
+                }
+
+                if (err == null) {
+                    U.warn(log, "ZooKeeper operation failed, will retry [err=" 
+ e +
+                        ", retryTimeout=" + RETRY_TIMEOUT +
+                        ", connLossTimeout=" + connLossTimeout +
+                        ", path=" + ((KeeperException)e).getPath() +
+                        ", remainingWaitTime=" + remainingTime + ']');
+
+                    stateMux.wait(RETRY_TIMEOUT);
+
+                    if (closing)
+                        throw new ZookeeperClientFailedException("ZooKeeper 
client is closed.");
+                }
+            }
+            else {
+                U.error(log, "Operation failed with unexpected error, close 
ZooKeeper client: " + e, e);
+
+                state = ConnectionState.Lost;
+
+                err = new ZookeeperClientFailedException(e);
+            }
+        }
+
+        if (err != null) {
+            closeClient();
+
+            notifyConnectionLost();
+
+            throw err;
+        }
+    }
+
+    /**
+     * @param code Zookeeper error code.
+     * @return {@code True} if can retry operation.
+     */
+    private boolean needRetry(int code) {
+        return code == KeeperException.Code.CONNECTIONLOSS.intValue() ||
+            code == KeeperException.Code.SESSIONMOVED.intValue() ||
+            code == KeeperException.Code.OPERATIONTIMEOUT.intValue();
+    }
+
+    /**
+     *
+     */
+    private void closeClient() {
+        try {
+            zk.close();
+        }
+        catch (Exception closeErr) {
+            U.warn(log, "Failed to close ZooKeeper client: " + closeErr, 
closeErr);
+        }
+
+        connTimer.cancel();
+    }
+
+    /**
+     *
+     */
+    private void scheduleConnectionCheck() {
+        assert state == ConnectionState.Disconnected : state;
+
+        connTimer.schedule(new ConnectionTimeoutTask(connStartTime), 
connLossTimeout);
+    }
+
+    /**
+     *
+     */
+    interface ZkAsyncOperation {
+        /**
+         *
+         */
+        void execute();
+    }
+
+    /**
+     *
+     */
+    class GetChildrenOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final Watcher watcher;
+
+        /** */
+        private final AsyncCallback.Children2Callback cb;
+
+        /**
+         * @param path Path.
+         * @param watcher Watcher.
+         * @param cb Callback.
+         */
+        GetChildrenOperation(String path, Watcher watcher, 
AsyncCallback.Children2Callback cb) {
+            this.path = path;
+            this.watcher = watcher;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            getChildrenAsync(path, watcher, cb);
+        }
+    }
+
+    /**
+     *
+     */
+    class GetDataOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final Watcher watcher;
+
+        /** */
+        private final AsyncCallback.DataCallback cb;
+
+        /**
+         * @param path Path.
+         * @param watcher Watcher.
+         * @param cb Callback.
+         */
+        GetDataOperation(String path, Watcher watcher, 
AsyncCallback.DataCallback cb) {
+            this.path = path;
+            this.watcher = watcher;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            getDataAsync(path, watcher, cb);
+        }
+    }
+
+    /**
+     *
+     */
+    class ExistsOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final Watcher watcher;
+
+        /** */
+        private final AsyncCallback.StatCallback cb;
+
+        /**
+         * @param path Path.
+         * @param watcher Watcher.
+         * @param cb Callback.
+         */
+        ExistsOperation(String path, Watcher watcher, 
AsyncCallback.StatCallback cb) {
+            this.path = path;
+            this.watcher = watcher;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            existsAsync(path, watcher, cb);
+        }
+    }
+
+    /**
+     *
+     */
+    class CreateOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final byte[] data;
+
+        /** */
+        private final CreateMode createMode;
+
+        /** */
+        private final AsyncCallback.StringCallback cb;
+
+        /**
+         * @param path path.
+         * @param data Data.
+         * @param createMode Create mode.
+         * @param cb Callback.
+         */
+        CreateOperation(String path, byte[] data, CreateMode createMode, 
AsyncCallback.StringCallback cb) {
+            this.path = path;
+            this.data = data;
+            this.createMode = createMode;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            createAsync(path, data, createMode, cb);
+        }
+    }
+
+    /**
+     *
+     */
+    class DeleteIfExistsOperation implements AsyncCallback.VoidCallback, 
ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /**
+         * @param path Path.
+         */
+        DeleteIfExistsOperation(String path) {
+            this.path = path;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            zk.delete(path, -1, this, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx) {
+            if (closing)
+                return;
+
+            if (rc == KeeperException.Code.NONODE.intValue())
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [" +
+                    "path=" + path + ']');
+
+                retryQ.add(this);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
+            else
+                assert rc == 0 : KeeperException.Code.get(rc);
+        }
+    }
+
+    /**
+     *
+     */
+    class CreateCallbackWrapper implements AsyncCallback.StringCallback {
+        /** */
+        final CreateOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        CreateCallbackWrapper(CreateOperation op) {
+            this.op = op;
+        }
+
+        @Override public void processResult(int rc, String path, Object ctx, 
String name) {
+            if (closing)
+                return;
+
+            if (rc == KeeperException.Code.NODEEXISTS.intValue())
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
+            else {
+                if (op.cb != null)
+                    op.cb.processResult(rc, path, ctx, name);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    class ChildrenCallbackWrapper implements AsyncCallback.Children2Callback {
+        /** */
+        private final GetChildrenOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        private ChildrenCallbackWrapper(GetChildrenOperation op) {
+            this.op = op;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+            if (closing)
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
+            else
+                op.cb.processResult(rc, path, ctx, children, stat);
+        }
+    }
+
+    /**
+     *
+     */
+    class DataCallbackWrapper implements AsyncCallback.DataCallback {
+        /** */
+        private final GetDataOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        private DataCallbackWrapper(GetDataOperation op) {
+            this.op = op;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+            if (closing)
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
+            else
+                op.cb.processResult(rc, path, ctx, data, stat);
+        }
+    }
+
+    /**
+     *
+     */
+    class StatCallbackWrapper implements AsyncCallback.StatCallback {
+        /** */
+        private final ExistsOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        private StatCallbackWrapper(ExistsOperation op) {
+            this.op = op;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+            if (closing)
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
+            else
+                op.cb.processResult(rc, path, ctx, stat);
+        }
+    }
+
+    /**
+     *
+     */
+    private class ConnectionTimeoutTask extends TimerTask {
+        /** */
+        private final long connectStartTime;
+
+        /**
+         * @param connectStartTime Time was connection started.
+         */
+        ConnectionTimeoutTask(long connectStartTime) {
+            this.connectStartTime = connectStartTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            boolean connLoss = false;
+
+            synchronized (stateMux) {
+                if (closing)
+                    return;
+
+                if (state == ConnectionState.Disconnected &&
+                    ZookeeperClient.this.connStartTime == connectStartTime) {
+
+                    state = ConnectionState.Lost;
+
+                    U.warn(log, "Failed to establish ZooKeeper connection, 
close client " +
+                        "[timeout=" + connLossTimeout + ']');
+
+                    connLoss = true;
+                }
+            }
+
+            if (connLoss) {
+                closeClient();
+
+                notifyConnectionLost();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private enum ConnectionState {
+        /** */
+        Connected,
+        /** */
+        Disconnected,
+        /** */
+        Lost
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4805be06/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
new file mode 100644
index 0000000..01d011b
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+class ZookeeperClientFailedException extends Exception {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param msg Message.
+     */
+    ZookeeperClientFailedException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * @param cause Cause.
+     */
+    ZookeeperClientFailedException(Throwable cause) {
+        super(cause);
+    }
+}

Reply via email to