Repository: ignite
Updated Branches:
  refs/heads/ignite-zk f76526396 -> ce7f0cbcf


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce7f0cbc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce7f0cbc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce7f0cbc

Branch: refs/heads/ignite-zk
Commit: ce7f0cbcff948852cca4827c42ff021a6c7c62be
Parents: f765263
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Nov 23 16:00:10 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Nov 23 16:12:06 2017 +0300

----------------------------------------------------------------------
 .../discovery/zk/internal/ZookeeperClient.java  | 80 ++++++++++++++++++--
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 19 +++--
 .../zk/internal/ZookeeperClientTest.java        | 22 ++++++
 .../ZookeeperDiscoverySpiBasicTest.java         |  4 +
 4 files changed, 113 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ce7f0cbc/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index d4d23ee..9125982 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -29,7 +29,6 @@ import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
@@ -206,6 +205,12 @@ public class ZookeeperClient implements Watcher {
             connLostC.run();
     }
 
+    /**
+     * @param path Path.
+     * @return {@code True} if node exists.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
     boolean exists(String path) throws ZookeeperClientFailedException, 
InterruptedException {
         for (;;) {
             long connStartTime = this.connStartTime;
@@ -219,8 +224,16 @@ public class ZookeeperClient implements Watcher {
         }
     }
 
-    void createAllIfNeeded(List<String> paths, CreateMode createMode)
-        throws ZookeeperClientFailedException, InterruptedException
+    /**
+     *
+     * @param paths Paths to create.
+     * @param createMode Create mode.
+     * @throws KeeperException.NodeExistsException If at least one of target 
node already exists.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    void createAll(List<String> paths, CreateMode createMode)
+        throws ZookeeperClientFailedException, InterruptedException, 
KeeperException.NodeExistsException
     {
         // TODO ZK: need check for max size?
         List<Op> ops = new ArrayList<>(paths.size());
@@ -282,7 +295,7 @@ public class ZookeeperClient implements Watcher {
             long connStartTime = this.connStartTime;
 
             try {
-                if (first) {
+                if (!first) {
                     List<String> children = zk.getChildren(dir, false);
 
                     for (int i = 0; i < children.size(); i++) {
@@ -315,6 +328,12 @@ public class ZookeeperClient implements Watcher {
         }
     }
 
+    /**
+     * @param path Path.
+     * @return Children nodes.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
     List<String> getChildren(String path)
         throws ZookeeperClientFailedException, InterruptedException
     {
@@ -371,6 +390,13 @@ public class ZookeeperClient implements Watcher {
         }
     }
 
+    /**
+     * @param path Path.
+     * @param ver Version.
+     * @throws KeeperException.NoNodeException If target node does not exist.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
     void delete(String path, int ver)
         throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
     {
@@ -391,6 +417,13 @@ public class ZookeeperClient implements Watcher {
         }
     }
 
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param ver Version.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
     void setData(String path, byte[] data, int ver)
         throws ZookeeperClientFailedException, InterruptedException
     {
@@ -411,6 +444,13 @@ public class ZookeeperClient implements Watcher {
         }
     }
 
+    /**
+     * @param path Path.
+     * @return Data.
+     * @throws KeeperException.NoNodeException If target node does not exist.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
     byte[] getData(String path)
         throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
     {
@@ -429,24 +469,45 @@ public class ZookeeperClient implements Watcher {
         }
     }
 
+    /**
+     * @param path Path.
+     * @param watcher Watcher.
+     * @param cb Callback.
+     */
     void existsAsync(String path, Watcher watcher, AsyncCallback.StatCallback 
cb) {
         ExistsOperation op = new ExistsOperation(path, watcher, cb);
 
         zk.exists(path, watcher, new StatCallbackWrapper(op), null);
     }
 
+    /**
+     * @param path Path.
+     * @param watcher Watcher.
+     * @param cb Callback.
+     */
     void getChildrenAsync(String path, Watcher watcher, 
AsyncCallback.Children2Callback cb) {
         GetChildrenOperation op = new GetChildrenOperation(path, watcher, cb);
 
         zk.getChildren(path, watcher, new ChildrenCallbackWrapper(op), null);
     }
 
+    /**
+     * @param path Path.
+     * @param watcher Watcher.
+     * @param cb Callback.
+     */
     void getDataAsync(String path, Watcher watcher, AsyncCallback.DataCallback 
cb) {
         GetDataOperation op = new GetDataOperation(path, watcher, cb);
 
         zk.getData(path, watcher, new DataCallbackWrapper(op), null);
     }
 
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param createMode Create mode.
+     * @param cb Callback.
+     */
     void createAsync(String path, byte[] data, CreateMode createMode, 
AsyncCallback.StringCallback cb) {
         if (data == null)
             data = EMPTY_BYTES;
@@ -547,8 +608,9 @@ public class ZookeeperClient implements Watcher {
      * @return {@code True} if can retry operation.
      */
     private boolean needRetry(int code) {
-        // TODO ZL: other codes.
-        return code == KeeperException.Code.CONNECTIONLOSS.intValue();
+        return code == KeeperException.Code.CONNECTIONLOSS.intValue() ||
+            code == KeeperException.Code.SESSIONMOVED.intValue() ||
+            code == KeeperException.Code.OPERATIONTIMEOUT.intValue();
     }
 
     /**
@@ -685,6 +747,12 @@ public class ZookeeperClient implements Watcher {
         /** */
         private final AsyncCallback.StringCallback cb;
 
+        /**
+         * @param path path.
+         * @param data Data.
+         * @param createMode Create mode.
+         * @param cb Callback.
+         */
         CreateOperation(String path, byte[] data, CreateMode createMode, 
AsyncCallback.StringCallback cb) {
             this.path = path;
             this.data = data;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce7f0cbc/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 1cfa2eb..f9786d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -333,7 +333,6 @@ public class ZookeeperDiscoveryImpl {
      */
     private void initZkNodes() throws InterruptedException {
         try {
-            // TODO ZK: use multi.
             if (zkClient.exists(zkPaths.aliveNodesDir))
                 return; // This path is created last, assume all others dirs 
are created.
 
@@ -347,7 +346,15 @@ public class ZookeeperDiscoveryImpl {
             dirs.add(zkPaths.customEvtsAcksDir);
             dirs.add(zkPaths.aliveNodesDir);
 
-            zkClient.createAllIfNeeded(dirs, PERSISTENT);
+            try {
+                zkClient.createAll(dirs, PERSISTENT);
+            }
+            catch (KeeperException.NodeExistsException e) {
+                U.warn(log, "Failed to create nodes using bulk operation: " + 
e);
+
+                for (String dir : dirs)
+                    zkClient.createIfNeeded(dir, null, PERSISTENT);
+            }
         }
         catch (ZookeeperClientFailedException e) {
             throw new IgniteSpiException("Failed to initialize Zookeeper 
nodes", e);
@@ -656,14 +663,14 @@ public class ZookeeperDiscoveryImpl {
         boolean newEvts = false;
 
         for (String child : aliveNodes) {
-            Integer inernalId = ZkIgnitePaths.aliveInternalId(child);
+            Integer internalId = ZkIgnitePaths.aliveInternalId(child);
 
-            Object old = alives.put(inernalId, child);
+            Object old = alives.put(internalId, child);
 
             assert old == null;
 
-            if (!top.nodesByInternalId.containsKey(inernalId)) {
-                generateNodeJoin(curTop, inernalId, child);
+            if (!top.nodesByInternalId.containsKey(internalId)) {
+                generateNodeJoin(curTop, internalId, child);
 
                 watchAliveNodeData(child);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce7f0cbc/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
index 8aac456..81edcde 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -52,6 +53,27 @@ public class ZookeeperClientTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCreateAll() throws Exception {
+        startZK(1);
+
+        ZookeeperClient client = new ZookeeperClient(log, 
zkCluster.getConnectString(), 3000, null);
+
+        client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT);
+
+        List<String> paths = new ArrayList<>();
+
+        paths.add("/apacheIgnite/1");
+        paths.add("/apacheIgnite/2");
+        paths.add("/apacheIgnite/3");
+
+        client.createAll(paths, CreateMode.PERSISTENT);
+
+        assertEquals(3, client.getChildren("/apacheIgnite").size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testDeleteAll() throws Exception {
         startZK(1);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce7f0cbc/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index f20cb19..9bdf670 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -870,6 +870,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void topologyChangeWithRestarts(boolean restartZk, boolean 
closeClientSock) throws Exception {
+        sesTimeout = 30_000;
+
         if (closeClientSock)
             testSockNio = true;
 
@@ -986,6 +988,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void randomTopologyChanges(boolean restartZk, boolean 
closeClientSock) throws Exception {
+        sesTimeout = 30_000;
+
         if (closeClientSock)
             testSockNio = true;
 

Reply via email to