Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 5793ffbe2 -> 45e7e4060


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/45e7e406
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/45e7e406
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/45e7e406

Branch: refs/heads/ignite-zk
Commit: 45e7e40603fa2b496b94f00fc287221d30073c98
Parents: 5793ffb
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Nov 20 11:14:36 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Nov 20 13:19:00 2017 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |   2 +-
 .../discovery/zk/internal/ZkClusterNodes.java   |  17 +
 .../zk/internal/ZkDiscoveryEventData.java       |   5 -
 .../internal/ZkDiscoveryNodeFailEventData.java  |  43 +++
 .../ZkDiscoveryNodeFailedEventData.java         |  34 --
 .../internal/ZkDiscoveryNodeJoinEventData.java  |  52 +++
 .../ZkDiscoveryNodeJoinedEventData.java         |  35 --
 .../zk/internal/ZkJoinEventDataForJoined.java   |   8 +
 .../spi/discovery/zk/internal/ZkPaths.java      |  13 +-
 .../discovery/zk/internal/ZookeeperClient.java  |  95 +++++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 352 +++++++++++++++----
 .../java/org/apache/ZookeeperNodeStart.java     |   2 +-
 .../zk/ZookeeperDiscoverySpiBasicTest.java      |   2 +-
 .../zk/internal/ZookeeperClientTest.java        |  16 +-
 14 files changed, 524 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 16407d2..92c4f54 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -146,7 +146,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
 
     /** {@inheritDoc} */
     @Override public ClusterNode getLocalNode() {
-        return impl.localNode();
+        return impl != null ? impl.localNode() : null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
index 084e907..e3e5f8b 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
@@ -72,4 +72,21 @@ public class ZkClusterNodes {
 
         assert old == null : old;
     }
+
+    ZookeeperClusterNode removeNode(int internalId) {
+        ZookeeperClusterNode node = nodesByInternalId.remove(internalId);
+
+        assert node != null : internalId;
+        assert node.order() > 0 : node;
+
+        Object rvmd = nodesByOrder.remove(node.order());
+
+        assert rvmd != null;
+
+        rvmd = nodesById.remove(node.id());
+
+        assert rvmd != null;
+
+        return node;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
index 32752e8..fb05d14 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
@@ -58,9 +58,4 @@ abstract class ZkDiscoveryEventData implements Serializable {
     long topologyVersion() {
         return topVer;
     }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(ZkDiscoveryEventData.class, this);
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
new file mode 100644
index 0000000..d7664d6
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailEventData.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.events.EventType;
+
+/**
+ *
+ */
+class ZkDiscoveryNodeFailEventData extends ZkDiscoveryEventData {
+    /** */
+    private int failedNodeInternalId;
+
+    ZkDiscoveryNodeFailEventData(long topVer, int failedNodeInternalId) {
+        super(EventType.EVT_NODE_FAILED, topVer);
+
+        this.failedNodeInternalId = failedNodeInternalId;
+    }
+
+    int failedNodeInternalId() {
+        return failedNodeInternalId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "NodeFailEventData [topVer=" + topologyVersion() + ", nodeId=" 
+ failedNodeInternalId + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailedEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailedEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailedEventData.java
deleted file mode 100644
index fe86e64..0000000
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeFailedEventData.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import org.apache.ignite.events.EventType;
-
-/**
- *
- */
-class ZkDiscoveryNodeFailedEventData extends ZkDiscoveryEventData {
-    /** */
-    private int failedNodeInternalId;
-
-    public ZkDiscoveryNodeFailedEventData(long topVer, int 
failedNodeInternalId) {
-        super(EventType.EVT_NODE_FAILED, topVer);
-
-        this.failedNodeInternalId = failedNodeInternalId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
new file mode 100644
index 0000000..36e37a2
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinEventData.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.events.EventType;
+
+/**
+ *
+ */
+class ZkDiscoveryNodeJoinEventData extends ZkDiscoveryEventData {
+    /** */
+    final int joinedInternalId;
+
+    /** */
+    final UUID nodeId;
+
+    /** */
+    transient ZkJoiningNodeData joiningNodeData;
+
+    /**
+     * @param topVer Topology version.
+     * @param nodeId Joined node ID.
+     * @param joinedInternalId Joined node internal ID.
+     */
+    ZkDiscoveryNodeJoinEventData(long topVer, UUID nodeId, int 
joinedInternalId) {
+        super(EventType.EVT_NODE_JOINED, topVer);
+
+        this.nodeId = nodeId;
+        this.joinedInternalId = joinedInternalId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "NodeJoinEventData [topVer=" + topologyVersion() + ", node=" + 
nodeId + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinedEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinedEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinedEventData.java
deleted file mode 100644
index 867f9c7..0000000
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryNodeJoinedEventData.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.UUID;
-import org.apache.ignite.events.EventType;
-
-/**
- *
- */
-class ZkDiscoveryNodeJoinedEventData extends ZkDiscoveryEventData {
-    /** */
-    final UUID nodeId;
-
-    ZkDiscoveryNodeJoinedEventData(long topVer, UUID nodeId) {
-        super(EventType.EVT_NODE_JOINED, topVer);
-
-        this.nodeId = nodeId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
index 538679a..cdbfdc0 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoinEventDataForJoined.java
@@ -42,4 +42,12 @@ class ZkJoinEventDataForJoined implements Serializable {
         this.top = top;
         this.discoData = discoData;
     }
+
+    List<ZookeeperClusterNode> topology() {
+        return top;
+    }
+
+    Map<Integer, Serializable> discoveryData() {
+        return discoData;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
index 57e7117..394ba59 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
@@ -24,15 +24,12 @@ import java.util.UUID;
  */
 class ZkPaths {
     /** */
-    static final int ID_LEN = 36;
+    private static final int UUID_LEN = 36;
 
     /** */
     private static final String JOIN_DATA_DIR = "joinData";
 
     /** */
-    private static final String JOIN_EVENTS_DATA_PATH = "joinEvents";
-
-    /** */
     private static final String ALIVE_NODES_DIR = "alive";
 
     /** */
@@ -45,10 +42,10 @@ class ZkPaths {
     private final String clusterName;
 
     /** */
-    final String aliveNodesDir;
+    final String clusterDir;
 
     /** */
-    final String joinEvtsDataDir;
+    final String aliveNodesDir;
 
     /** */
     final String joinDataDir;
@@ -64,9 +61,9 @@ class ZkPaths {
         this.basePath = basePath;
         this.clusterName = clusterName;
 
+        clusterDir = basePath + "/" + clusterName;
         aliveNodesDir = zkPath(ALIVE_NODES_DIR);
         joinDataDir = zkPath(JOIN_DATA_DIR);
-        joinEvtsDataDir = zkPath(JOIN_EVENTS_DATA_PATH);
         evtsPath = zkPath(DISCO_EVENTS_PATH);
     }
 
@@ -85,7 +82,7 @@ class ZkPaths {
     }
 
     static UUID aliveNodeId(String path) {
-        String idStr = path.substring(0, ZkPaths.ID_LEN);
+        String idStr = path.substring(0, ZkPaths.UUID_LEN);
 
         return UUID.fromString(idStr);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 0b17cab..95d548b 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -136,6 +136,9 @@ public class ZookeeperClient implements Watcher {
                     return;
                 }
 
+                if (!zk.getState().isAlive())
+                    return;
+
                 Event.KeeperState zkState = evt.getState();
 
                 switch (zkState) {
@@ -249,9 +252,32 @@ public class ZookeeperClient implements Watcher {
         }
     }
 
+    void delete(String path, int ver)
+        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
+    {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.delete(path, ver);
+
+                return;
+            }
+            catch (KeeperException.NoNodeException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
     void setData(String path, byte[] data, int ver)
         throws ZookeeperClientFailedException, InterruptedException
     {
+        if (data == null)
+            data = EMPTY_BYTES;
+
         for (;;) {
             long connStartTime = this.connStartTime;
 
@@ -284,6 +310,12 @@ public class ZookeeperClient implements Watcher {
         }
     }
 
+    void existsAsync(String path, Watcher watcher, AsyncCallback.StatCallback 
cb) {
+        ExistsOperation op = new ExistsOperation(path, watcher, cb);
+
+        zk.exists(path, watcher, new StatCallbackWrapper(op), null);
+    }
+
     void getChildrenAsync(String path, Watcher watcher, 
AsyncCallback.Children2Callback cb) {
         GetChildrenOperation op = new GetChildrenOperation(path, watcher, cb);
 
@@ -438,6 +470,11 @@ public class ZookeeperClient implements Watcher {
         /** */
         private final AsyncCallback.DataCallback cb;
 
+        /**
+         * @param path Path.
+         * @param watcher Watcher.
+         * @param cb Callback.
+         */
         GetDataOperation(String path, Watcher watcher, 
AsyncCallback.DataCallback cb) {
             this.path = path;
             this.watcher = watcher;
@@ -453,6 +490,36 @@ public class ZookeeperClient implements Watcher {
     /**
      *
      */
+    class ExistsOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final Watcher watcher;
+
+        /** */
+        private final AsyncCallback.StatCallback cb;
+
+        /**
+         * @param path Path.
+         * @param watcher Watcher.
+         * @param cb Callback.
+         */
+        ExistsOperation(String path, Watcher watcher, 
AsyncCallback.StatCallback cb) {
+            this.path = path;
+            this.watcher = watcher;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            existsAsync(path, watcher, cb);
+        }
+    }
+
+    /**
+     *
+     */
     private void closeClient() {
         try {
             zk.close();
@@ -532,6 +599,34 @@ public class ZookeeperClient implements Watcher {
     /**
      *
      */
+    class StatCallbackWrapper implements AsyncCallback.StatCallback {
+        /** */
+        private final ExistsOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        private StatCallbackWrapper(ExistsOperation op) {
+            this.op = op;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
+            else
+                op.cb.processResult(rc, path, ctx, stat);
+        }
+    }
+
+    /**
+     *
+     */
     private class ConnectionTimeoutTask extends TimerTask {
         /** */
         private final long connectStartTime;

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 14506a1..4378c6f 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -51,7 +51,6 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.zookeeper.CreateMode.EPHEMERAL;
 import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
 import static org.apache.zookeeper.CreateMode.PERSISTENT;
 
@@ -98,13 +97,21 @@ public class ZookeeperDiscoveryImpl {
     /** */
     private long gridStartTime;
 
+    /**
+     * @param log
+     * @param basePath
+     * @param clusterName
+     * @param locNode
+     * @param lsnr
+     * @param exchange
+     */
     public ZookeeperDiscoveryImpl(IgniteLogger log,
         String basePath,
         String clusterName,
         ZookeeperClusterNode locNode,
         DiscoverySpiListener lsnr,
         DiscoverySpiDataExchange exchange) {
-        assert locNode.id() != null : locNode;
+        assert locNode.id() != null && locNode.isLocal() : locNode;
 
         if (F.isEmpty(clusterName))
             throw new IllegalArgumentException("Cluster name is empty.");
@@ -245,6 +252,8 @@ public class ZookeeperDiscoveryImpl {
 
             zkClient.createIfNeeded(zkPaths.basePath, null, PERSISTENT);
 
+            zkClient.createIfNeeded(zkPaths.clusterDir, null, PERSISTENT);
+
             zkClient.createIfNeeded(zkPaths.evtsPath, null, PERSISTENT);
 
             zkClient.createIfNeeded(zkPaths.joinDataDir, null, PERSISTENT);
@@ -269,9 +278,9 @@ public class ZookeeperDiscoveryImpl {
             // TODO ZK: handle max size.
             String path = zkClient.createIfNeeded(zkPaths.joinDataDir + "/" + 
locNode.id() + "|", joinDataBytes, EPHEMERAL_SEQUENTIAL);
 
-            int seqNum = Integer.parseInt(path.substring(ZkPaths.ID_LEN + 2));
+            int seqNum = Integer.parseInt(path.substring(path.lastIndexOf('|') 
+ 1));
 
-            zkClient.createIfNeeded(zkPaths.aliveNodesDir + "/" + locNode.id() 
+ "|" + seqNum + "|", null, EPHEMERAL);
+            zkClient.createIfNeeded(zkPaths.aliveNodesDir + "/" + locNode.id() 
+ "|" + seqNum + "|", null, EPHEMERAL_SEQUENTIAL);
 
             zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
AsyncCallback.Children2Callback() {
                 @Override public void processResult(int rc, String path, 
Object ctx, List<String> children, Stat stat) {
@@ -301,45 +310,99 @@ public class ZookeeperDiscoveryImpl {
 
             TreeMap<Integer, String> alives = new TreeMap<>();
 
-            for (String child : aliveNodes)
-                alives.put(ZkPaths.aliveInternalId(child), child);
+            Integer locInternalId = null;
+
+            for (String aliveNodePath : aliveNodes) {
+                Integer internalId = ZkPaths.aliveInternalId(aliveNodePath);
+
+                alives.put(internalId, aliveNodePath);
+
+                if (locInternalId == null) {
+                    UUID nodeId = ZkPaths.aliveNodeId(aliveNodePath);
+
+                    if (locNode.id().equals(nodeId))
+                        locInternalId = internalId;
+                }
+            }
 
             assert !alives.isEmpty();
+            assert locInternalId != null;
 
             Map.Entry<Integer, String> crdE = alives.firstEntry();
 
-            UUID crdId = ZkPaths.aliveNodeId(crdE.getValue());
-
-            if (crdId.equals(locNode.id())) {
-                byte[] evtsData = zkClient.getData(zkPaths.evtsPath);
+            if (locInternalId.equals(crdE.getKey()))
+                onBecomeCoordinator(locInternalId);
+            else {
+                assert alives.size() > 1;
+
+                Map.Entry<Integer, String> prevE = 
alives.floorEntry(locInternalId - 1);
+
+                assert prevE != null;
+
+                final int locInternalId0 = locInternalId;
+
+                zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + 
prevE.getValue(), new Watcher() {
+                    @Override public void process(WatchedEvent evt) {
+                        if (evt.getType() == Event.EventType.NodeDeleted) {
+                            try {
+                                onPreviousCoordinatorFail(locInternalId0);
+                            }
+                            catch (Throwable e) {
+                                onFatalError(e);
+                            }
+                        }
+                    }
+                }, new AsyncCallback.StatCallback() {
+                    @Override public void processResult(int rc, String path, 
Object ctx, Stat stat) {
+                        assert rc == 0 : rc;
+
+                        if (stat == null) {
+                            try {
+                                onPreviousCoordinatorFail(locInternalId0);
+                            }
+                            catch (Throwable e) {
+                                onFatalError(e);
+                            }
+                        }
+                    }
+                });
+            }
+        }
+        catch (Throwable e) {
+            onFatalError(e);
+        }
+    }
 
-                if (evtsData.length > 0)
-                    onEventsUpdate(evtsData, null);
+    private void onPreviousCoordinatorFail(int locInternalId) throws Exception 
{
+        if (log.isInfoEnabled())
+            log.info("Previous discovery coordinator failed [locId=" + 
locNode.id() + ']');
 
-                crd = true;
+        onBecomeCoordinator(locInternalId);
+    }
 
-                if (joined) {
-                    log.info("Node connected to zookeeper, node is new 
discovery coordinator " +
-                        "[locId=" + locNode.id() + ']');
+    private void onBecomeCoordinator(int locInternalId) throws Exception {
+        byte[] evtsData = zkClient.getData(zkPaths.evtsPath);
 
-                    assert locNode.order() > 0 : locNode;
-                    assert evts != null;
-                }
-                else {
-                    log.info("Node connected to zookeeper, node is first 
cluster node [locId=" + locNode.id() + ']');
+        if (evtsData.length > 0)
+            onEventsUpdate(evtsData, null);
 
-                    newClusterStarted(crdE.getKey());
-                }
+        crd = true;
 
-                zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, 
childrenCallback);
-            }
-            else {
+        if (joined) {
+            if (log.isInfoEnabled())
+                log.info("Node is new discovery coordinator [locId=" + 
locNode.id() + ']');
 
-            }
+            assert locNode.order() > 0 : locNode;
+            assert evts != null;
         }
-        catch (Throwable e) {
-            onFatalError(e);
+        else {
+            if (log.isInfoEnabled())
+                log.info("Node is first cluster node [locId=" + locNode.id() + 
']');
+
+            newClusterStarted(locInternalId);
         }
+
+        zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, 
childrenCallback);
     }
 
     private void generateTopologyEvents(List<String> aliveNodes) throws 
Exception {
@@ -359,12 +422,12 @@ public class ZookeeperDiscoveryImpl {
             assert old == null;
 
             if (!top.nodesByInternalId.containsKey(inernalId))
-                processNodeJoin(curTop, inernalId, child);
+                generateNodeJoin(curTop, inernalId, child);
         }
 
         for (Map.Entry<Integer, ZookeeperClusterNode> e : 
top.nodesByInternalId.entrySet()) {
             if (!alives.containsKey(e.getKey()))
-                processNodeFail(curTop, e.getValue());
+                generateNodeFail(curTop, e.getValue());
         }
 
         if (evts.evts.size() > evtCnt) {
@@ -378,17 +441,23 @@ public class ZookeeperDiscoveryImpl {
 
             if (log.isInfoEnabled())
                 log.info("Discovery coordinator saved new events [topVer=" + 
evts.topVer + ", saveTime=" + time + ']');
+
+            onEventsUpdate(evts);
         }
     }
 
-    private void processNodeFail(TreeMap<Long, ZookeeperClusterNode> curTop, 
ZookeeperClusterNode failedNode) {
+    /**
+     * @param curTop Current topology.
+     * @param failedNode Failed node.
+     */
+    private void generateNodeFail(TreeMap<Long, ZookeeperClusterNode> curTop, 
ZookeeperClusterNode failedNode) {
         Object rmvd = curTop.remove(failedNode.order());
 
         assert rmvd != null;
 
         evts.topVer++;
 
-        ZkDiscoveryEventData evtData = new 
ZkDiscoveryNodeFailedEventData(evts.topVer, failedNode.internalId());
+        ZkDiscoveryEventData evtData = new 
ZkDiscoveryNodeFailEventData(evts.topVer, failedNode.internalId());
 
         evts.addEvent(evtData);
 
@@ -398,7 +467,7 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
-    private void processNodeJoin(TreeMap<Long, ZookeeperClusterNode> curTop,
+    private void generateNodeJoin(TreeMap<Long, ZookeeperClusterNode> curTop,
         int internalId,
         String aliveNodePath)
         throws Exception
@@ -406,7 +475,7 @@ public class ZookeeperDiscoveryImpl {
         UUID nodeId = ZkPaths.aliveNodeId(aliveNodePath);
         int joinSeq = ZkPaths.aliveJoinSequence(aliveNodePath);
 
-        String joinDataPath = zkPaths.zkPath(zkPaths.joinDataDir + '/' + 
nodeId.toString() + "|" + joinSeq);
+        String joinDataPath = zkPaths.joinDataDir + '/' + nodeId.toString() + 
"|" + String.format("%010d", joinSeq);
 
         byte[] joinData;
 
@@ -414,7 +483,7 @@ public class ZookeeperDiscoveryImpl {
             joinData = zkClient.getData(joinDataPath);
         }
         catch (KeeperException.NoNodeException e) {
-            U.warn(log, "Failed to read joinin node data, node left before 
join process finished: " + nodeId);
+            U.warn(log, "Failed to read joining node data, node left before 
join process finished: " + nodeId);
 
             return;
         }
@@ -431,10 +500,6 @@ public class ZookeeperDiscoveryImpl {
         joinedNode.order(evts.topVer);
         joinedNode.internalId(internalId);
 
-        Object old = curTop.put(joinedNode.order(), joinedNode);
-
-        assert old == null;
-
         DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId);
 
         joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData());
@@ -447,10 +512,19 @@ public class ZookeeperDiscoveryImpl {
 
         Map<Integer, Serializable> commonData = collectBag.commonData();
 
-        ZkJoinEventDataForJoined dataForJoined = new 
ZkJoinEventDataForJoined(new ArrayList<>(curTop.values()),
+        ZkJoinEventDataForJoined dataForJoined = new ZkJoinEventDataForJoined(
+            new ArrayList<>(curTop.values()),
             commonData);
 
-        ZkDiscoveryEventData evtData = new 
ZkDiscoveryNodeJoinedEventData(evts.topVer, joinedNode.id());
+        Object old = curTop.put(joinedNode.order(), joinedNode);
+
+        assert old == null;
+
+        ZkDiscoveryNodeJoinEventData evtData = new 
ZkDiscoveryNodeJoinEventData(evts.topVer,
+            joinedNode.id(),
+            joinedNode.internalId());
+
+        evtData.joiningNodeData = joiningNodeData;
 
         evts.addEvent(evtData);
 
@@ -476,14 +550,14 @@ public class ZookeeperDiscoveryImpl {
      */
     @SuppressWarnings("unchecked")
     private void newClusterStarted(int locInternalId) throws Exception {
+        cleanupPreviousClusterData();
+
         joined = true;
 
         gridStartTime = U.currentTimeMillis();
 
         evts = new ZkDiscoveryEventsData(gridStartTime, 1L, new TreeMap<Long, 
ZkDiscoveryEventData>());
 
-        // TODO ZK: old data cleanup.
-
         locNode.internalId(locInternalId);
         locNode.order(1);
 
@@ -499,6 +573,24 @@ public class ZookeeperDiscoveryImpl {
         joinFut.onDone();
     }
 
+    private void cleanupPreviousClusterData() throws Exception {
+        // TODO ZK: use multi.
+        zkClient.setData(zkPaths.evtsPath, null, -1);
+
+        for (String evtPath : zkClient.getChildren(zkPaths.evtsPath)) {
+            String evtDir = zkPaths.evtsPath + "/" + evtPath;
+
+            removeChildren(evtDir);
+
+            zkClient.delete(evtDir, -1);
+        }
+    }
+
+    private void removeChildren(String path) throws Exception {
+        for (String childPath : zkClient.getChildren(path))
+            zkClient.delete(path + "/" + childPath, -1);
+    }
+
     /**
      * @param children
      * @param stat
@@ -513,37 +605,166 @@ public class ZookeeperDiscoveryImpl {
     /** */
     private boolean joined;
 
-    /**
-     * @param data
-     * @param stat
-     */
-    private void onEventsUpdate(byte[] data, Stat stat) throws 
IgniteCheckedException {
+    private void onEventsUpdate(byte[] data, Stat stat) throws Exception {
+        if (data.length == 0)
+            return;
+
+        assert !crd;
+
         ZkDiscoveryEventsData evtsData = unmarshal(data);
 
+        onEventsUpdate(evtsData);
+
+        evts = evtsData;
+    }
+
+    /**
+     * @param evtsData Events.
+     */
+    private void onEventsUpdate(ZkDiscoveryEventsData evtsData) throws 
Exception {
         TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
 
-        for (Map.Entry<Long, ZkDiscoveryEventData> e : 
evts.tailMap(lastProcEvt).entrySet()) {
+        for (Map.Entry<Long, ZkDiscoveryEventData> e : 
evts.tailMap(lastProcEvt, false).entrySet()) {
             ZkDiscoveryEventData evtData = e.getValue();
 
+            if (log.isInfoEnabled()) {
+                log.info("New discovery event data: " + evtData + ']');
+            }
+
             if (!joined) {
                 if (evtData.eventType() != EventType.EVT_NODE_JOINED)
                     continue;
 
-                UUID joinedId = 
((ZkDiscoveryNodeJoinedEventData)evtData).nodeId;
+                ZkDiscoveryNodeJoinEventData evtData0 = 
(ZkDiscoveryNodeJoinEventData)evtData;
+
+                UUID joinedId = ((ZkDiscoveryNodeJoinEventData)evtData).nodeId;
 
                 boolean locJoin = evtData.eventType() == 
EventType.EVT_NODE_JOINED &&
                     locNode.id().equals(joinedId);
 
                 if (locJoin) {
+                    String path = zkPaths.evtsPath + "/" + 
evtData.topologyVersion() + "/joined";
+
+                    ZkJoinEventDataForJoined dataForJoined = 
unmarshal(zkClient.getData(path));
+
+                    gridStartTime = evtsData.gridStartTime;
+
+                    locNode.internalId(evtData0.joinedInternalId);
+                    locNode.order(evtData.topologyVersion());
+
+                    DiscoveryDataBag dataBag = new 
DiscoveryDataBag(locNode.id());
+
+                    dataBag.commonData(dataForJoined.discoveryData());
+
+                    exchange.onExchange(dataBag);
+
+                    List<ZookeeperClusterNode> allNodes = 
dataForJoined.topology();
+
+                    for (ZookeeperClusterNode node : allNodes)
+                        top.addNode(node);
+
+                    top.addNode(locNode);
+
+                    List<ClusterNode> topSnapshot = new 
ArrayList<>((Collection)top.nodesByOrder.values());
+
+                    lsnr.onDiscovery(evtData.eventType(),
+                        evtData.topologyVersion(),
+                        locNode,
+                        topSnapshot,
+                        Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                        null);
+
+                    joinFut.onDone();
+
+                    joined = true;
+                }
+            }
+            else {
+                switch (evtData.eventType()) {
+                    case EventType.EVT_NODE_JOINED: {
+                        ZkDiscoveryNodeJoinEventData evtData0 = 
(ZkDiscoveryNodeJoinEventData)evtData;
+
+                        ZkJoiningNodeData joiningData;
+
+                        if (!crd) {
+                            String path = zkPaths.evtsPath + "/" + 
evtData.topologyVersion();
+
+                            joiningData = unmarshal(zkClient.getData(path));
+
+                            DiscoveryDataBag dataBag = new 
DiscoveryDataBag(evtData0.nodeId);
+
+                            
dataBag.joiningNodeData(joiningData.discoveryData());
+
+                            exchange.onExchange(dataBag);
+                        }
+                        else {
+                            assert evtData0.joiningNodeData != null;
+
+                            joiningData = evtData0.joiningNodeData;
+                        }
 
+                        notifyNodeJoin(evtData0, joiningData);
+
+                        break;
+                    }
+
+                    case EventType.EVT_NODE_FAILED: {
+                        
notifyNodeFail((ZkDiscoveryNodeFailEventData)e.getValue());
+
+                        break;
+                    }
+
+                    default:
+                        assert false : "Invalid event: " + evtData;
                 }
             }
 
-            lastProcEvt = e.getKey();
+            if (joined)
+                lastProcEvt = e.getKey();
         }
     }
 
     /**
+     * @param evtData Event data.
+     * @param joiningData Joining node data.
+     */
+    private void notifyNodeJoin(ZkDiscoveryNodeJoinEventData evtData, 
ZkJoiningNodeData joiningData) {
+        ZookeeperClusterNode joinedNode = joiningData.node();
+
+        joinedNode.order(evtData.topologyVersion());
+        joinedNode.internalId(evtData.joinedInternalId);
+
+        top.addNode(joinedNode);
+
+        List<ClusterNode> topSnapshot = new 
ArrayList<>((Collection)top.nodesByOrder.values());
+
+        lsnr.onDiscovery(evtData.eventType(),
+            evtData.topologyVersion(),
+            joinedNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
+    }
+
+    /**
+     * @param evtData Event data.
+     */
+    private void notifyNodeFail(ZkDiscoveryNodeFailEventData evtData) {
+        ZookeeperClusterNode failedNode = 
top.removeNode(evtData.failedNodeInternalId());
+
+        assert failedNode != null;
+
+        List<ClusterNode> topSnapshot = new 
ArrayList<>((Collection)top.nodesByOrder.values());
+
+        lsnr.onDiscovery(evtData.eventType(),
+            evtData.topologyVersion(),
+            failedNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
+    }
+
+    /**
      *
      */
     public void stop() {
@@ -592,6 +813,7 @@ public class ZookeeperDiscoveryImpl {
     private class ConnectionLossListener implements IgniteRunnable {
         /** {@inheritDoc} */
         @Override public void run() {
+            // TODO ZK
             U.warn(log, "Zookeeper connection loss, local node is SEGMENTED");
         }
     }
@@ -602,10 +824,20 @@ public class ZookeeperDiscoveryImpl {
     private class ZkWatcher implements Watcher {
         /** {@inheritDoc} */
         @Override public void process(WatchedEvent evt) {
-            if (evt.getType() == Event.EventType.NodeDataChanged)
-                zkClient.getDataAsync(evt.getPath(), this, dataCallback);
-            else if (evt.getType() == Event.EventType.NodeChildrenChanged)
-                zkClient.getChildrenAsync(evt.getPath(), this, 
childrenCallback);
+            if (evt.getType() == Event.EventType.NodeDataChanged) {
+                if (evt.getPath().equals(zkPaths.evtsPath)) {
+                    if (!crd)
+                        zkClient.getDataAsync(evt.getPath(), this, 
dataCallback);
+                }
+                else
+                    U.warn(log, "Received NodeDataChanged for unexpected path: 
" + evt.getPath());
+            }
+            else if (evt.getType() == Event.EventType.NodeChildrenChanged) {
+                if (evt.getPath().equals(zkPaths.aliveNodesDir))
+                    zkClient.getChildrenAsync(evt.getPath(), this, 
childrenCallback);
+                else
+                    U.warn(log, "Received NodeChildrenChanged for unexpected 
path: " + evt.getPath());
+            }
         }
     }
 
@@ -621,7 +853,7 @@ public class ZookeeperDiscoveryImpl {
                 if (path.equals(zkPaths.aliveNodesDir))
                     onAliveNodesUpdate(children, stat);
                 else
-                    U.warn(log, "Children callback for unknown path: " + path);
+                    U.warn(log, "Children callback for unexpected path: " + 
path);
             }
             catch (Throwable e) {
                 onFatalError(e);
@@ -638,8 +870,10 @@ public class ZookeeperDiscoveryImpl {
             try {
                 assert rc == 0 : rc;
 
-                if (path.equals(zkPaths.evtsPath))
-                    onEventsUpdate(data, stat);
+                if (path.equals(zkPaths.evtsPath)) {
+                    if (!crd)
+                        onEventsUpdate(data, stat);
+                }
                 else
                     U.warn(log, "Data callback for unknown path: " + path);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/test/java/org/apache/ZookeeperNodeStart.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ZookeeperNodeStart.java 
b/modules/zookeeper/src/test/java/org/apache/ZookeeperNodeStart.java
index 89d6604..ef4d5f4 100644
--- a/modules/zookeeper/src/test/java/org/apache/ZookeeperNodeStart.java
+++ b/modules/zookeeper/src/test/java/org/apache/ZookeeperNodeStart.java
@@ -31,7 +31,7 @@ public class ZookeeperNodeStart {
 
             ZookeeperDiscoverySpi spi = new ZookeeperDiscoverySpi();
 
-            spi.setConnectString("localhost:2181");
+            spi.setZkConnectionString("localhost:2181");
 
             cfg.setDiscoverySpi(spi);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index 54dc1b4..0054641 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -722,7 +722,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
             GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
                 @Override public void apply(Integer idx) {
-                    stopGrid(idx + 3);
+                    stopGrid(idx + SRVS);
                 }
             }, THREADS, "stop-node");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/45e7e406/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
index 0d4c86b..f85cf5a 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
@@ -102,12 +102,12 @@ public class ZookeeperClientTest extends 
GridCommonAbstractTest {
 
         final AtomicBoolean res = new AtomicBoolean();
 
-        client.getChildrenAsync("/apacheIgnite1", false, new 
AsyncCallback.Children2Callback() {
+        client.getChildrenAsync("/apacheIgnite1", null, new 
AsyncCallback.Children2Callback() {
             @Override public void processResult(int rc, String path, Object 
ctx, List<String> children, Stat stat) {
                 if (rc == 0)
                     res.set(true);
             }
-        }, null);
+        });
 
         cb.get(10_000);
 
@@ -128,7 +128,7 @@ public class ZookeeperClientTest extends 
GridCommonAbstractTest {
 
         final CountDownLatch l = new CountDownLatch(1);
 
-        client.getChildrenAsync("/apacheIgnite1", false, new 
AsyncCallback.Children2Callback() {
+        client.getChildrenAsync("/apacheIgnite1", null, new 
AsyncCallback.Children2Callback() {
             @Override public void processResult(int rc, String path, Object 
ctx, List<String> children, Stat stat) {
                 closeZK();
 
@@ -144,7 +144,7 @@ public class ZookeeperClientTest extends 
GridCommonAbstractTest {
                     fail("Unexpected error: " + e);
                 }
             }
-        }, null);
+        });
 
         assertTrue(l.await(10, TimeUnit.SECONDS));
 
@@ -196,14 +196,14 @@ public class ZookeeperClientTest extends 
GridCommonAbstractTest {
 
         final CountDownLatch l = new CountDownLatch(1);
 
-        client.getChildrenAsync("/apacheIgnite1", false, new 
AsyncCallback.Children2Callback() {
+        client.getChildrenAsync("/apacheIgnite1", null, new 
AsyncCallback.Children2Callback() {
             @Override public void processResult(int rc, String path, Object 
ctx, List<String> children, Stat stat) {
                 info("Callback: " + rc);
 
                 if (rc == 0)
                     l.countDown();
             }
-        }, null);
+        });
 
         IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() 
{
             @Override public Void call() throws Exception {
@@ -236,7 +236,7 @@ public class ZookeeperClientTest extends 
GridCommonAbstractTest {
 
         final CountDownLatch l = new CountDownLatch(1);
 
-        client.getChildrenAsync("/apacheIgnite1", false, new 
AsyncCallback.Children2Callback() {
+        client.getChildrenAsync("/apacheIgnite1", null, new 
AsyncCallback.Children2Callback() {
             @Override public void processResult(int rc, String path, Object 
ctx, List<String> children, Stat stat) {
                 try {
                     zkCluster.getServers().get(0).stop();
@@ -265,7 +265,7 @@ public class ZookeeperClientTest extends 
GridCommonAbstractTest {
                     fail("Unexpected error: " + e);
                 }
             }
-        }, null);
+        });
 
         assertTrue(l.await(10, TimeUnit.SECONDS));
     }

Reply via email to