Repository: ignite Updated Branches: refs/heads/ignite-zk f76526396 -> ce7f0cbcf
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce7f0cbc Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce7f0cbc Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce7f0cbc Branch: refs/heads/ignite-zk Commit: ce7f0cbcff948852cca4827c42ff021a6c7c62be Parents: f765263 Author: sboikov <sboi...@gridgain.com> Authored: Thu Nov 23 16:00:10 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Nov 23 16:12:06 2017 +0300 ---------------------------------------------------------------------- .../discovery/zk/internal/ZookeeperClient.java | 80 ++++++++++++++++++-- .../zk/internal/ZookeeperDiscoveryImpl.java | 19 +++-- .../zk/internal/ZookeeperClientTest.java | 22 ++++++ .../ZookeeperDiscoverySpiBasicTest.java | 4 + 4 files changed, 113 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ce7f0cbc/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index d4d23ee..9125982 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -29,7 +29,6 @@ import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; @@ -206,6 +205,12 @@ public class ZookeeperClient implements Watcher { connLostC.run(); } + /** + * @param path Path. + * @return {@code True} if node exists. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ boolean exists(String path) throws ZookeeperClientFailedException, InterruptedException { for (;;) { long connStartTime = this.connStartTime; @@ -219,8 +224,16 @@ public class ZookeeperClient implements Watcher { } } - void createAllIfNeeded(List<String> paths, CreateMode createMode) - throws ZookeeperClientFailedException, InterruptedException + /** + * + * @param paths Paths to create. + * @param createMode Create mode. + * @throws KeeperException.NodeExistsException If at least one of target node already exists. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ + void createAll(List<String> paths, CreateMode createMode) + throws ZookeeperClientFailedException, InterruptedException, KeeperException.NodeExistsException { // TODO ZK: need check for max size? List<Op> ops = new ArrayList<>(paths.size()); @@ -282,7 +295,7 @@ public class ZookeeperClient implements Watcher { long connStartTime = this.connStartTime; try { - if (first) { + if (!first) { List<String> children = zk.getChildren(dir, false); for (int i = 0; i < children.size(); i++) { @@ -315,6 +328,12 @@ public class ZookeeperClient implements Watcher { } } + /** + * @param path Path. + * @return Children nodes. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ List<String> getChildren(String path) throws ZookeeperClientFailedException, InterruptedException { @@ -371,6 +390,13 @@ public class ZookeeperClient implements Watcher { } } + /** + * @param path Path. + * @param ver Version. + * @throws KeeperException.NoNodeException If target node does not exist. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ void delete(String path, int ver) throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException { @@ -391,6 +417,13 @@ public class ZookeeperClient implements Watcher { } } + /** + * @param path Path. + * @param data Data. + * @param ver Version. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ void setData(String path, byte[] data, int ver) throws ZookeeperClientFailedException, InterruptedException { @@ -411,6 +444,13 @@ public class ZookeeperClient implements Watcher { } } + /** + * @param path Path. + * @return Data. + * @throws KeeperException.NoNodeException If target node does not exist. + * @throws ZookeeperClientFailedException If connection to zk was lost. + * @throws InterruptedException If interrupted. + */ byte[] getData(String path) throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException { @@ -429,24 +469,45 @@ public class ZookeeperClient implements Watcher { } } + /** + * @param path Path. + * @param watcher Watcher. + * @param cb Callback. + */ void existsAsync(String path, Watcher watcher, AsyncCallback.StatCallback cb) { ExistsOperation op = new ExistsOperation(path, watcher, cb); zk.exists(path, watcher, new StatCallbackWrapper(op), null); } + /** + * @param path Path. + * @param watcher Watcher. + * @param cb Callback. + */ void getChildrenAsync(String path, Watcher watcher, AsyncCallback.Children2Callback cb) { GetChildrenOperation op = new GetChildrenOperation(path, watcher, cb); zk.getChildren(path, watcher, new ChildrenCallbackWrapper(op), null); } + /** + * @param path Path. + * @param watcher Watcher. + * @param cb Callback. + */ void getDataAsync(String path, Watcher watcher, AsyncCallback.DataCallback cb) { GetDataOperation op = new GetDataOperation(path, watcher, cb); zk.getData(path, watcher, new DataCallbackWrapper(op), null); } + /** + * @param path Path. + * @param data Data. + * @param createMode Create mode. + * @param cb Callback. + */ void createAsync(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) { if (data == null) data = EMPTY_BYTES; @@ -547,8 +608,9 @@ public class ZookeeperClient implements Watcher { * @return {@code True} if can retry operation. */ private boolean needRetry(int code) { - // TODO ZL: other codes. - return code == KeeperException.Code.CONNECTIONLOSS.intValue(); + return code == KeeperException.Code.CONNECTIONLOSS.intValue() || + code == KeeperException.Code.SESSIONMOVED.intValue() || + code == KeeperException.Code.OPERATIONTIMEOUT.intValue(); } /** @@ -685,6 +747,12 @@ public class ZookeeperClient implements Watcher { /** */ private final AsyncCallback.StringCallback cb; + /** + * @param path path. + * @param data Data. + * @param createMode Create mode. + * @param cb Callback. + */ CreateOperation(String path, byte[] data, CreateMode createMode, AsyncCallback.StringCallback cb) { this.path = path; this.data = data; http://git-wip-us.apache.org/repos/asf/ignite/blob/ce7f0cbc/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 1cfa2eb..f9786d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -333,7 +333,6 @@ public class ZookeeperDiscoveryImpl { */ private void initZkNodes() throws InterruptedException { try { - // TODO ZK: use multi. if (zkClient.exists(zkPaths.aliveNodesDir)) return; // This path is created last, assume all others dirs are created. @@ -347,7 +346,15 @@ public class ZookeeperDiscoveryImpl { dirs.add(zkPaths.customEvtsAcksDir); dirs.add(zkPaths.aliveNodesDir); - zkClient.createAllIfNeeded(dirs, PERSISTENT); + try { + zkClient.createAll(dirs, PERSISTENT); + } + catch (KeeperException.NodeExistsException e) { + U.warn(log, "Failed to create nodes using bulk operation: " + e); + + for (String dir : dirs) + zkClient.createIfNeeded(dir, null, PERSISTENT); + } } catch (ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); @@ -656,14 +663,14 @@ public class ZookeeperDiscoveryImpl { boolean newEvts = false; for (String child : aliveNodes) { - Integer inernalId = ZkIgnitePaths.aliveInternalId(child); + Integer internalId = ZkIgnitePaths.aliveInternalId(child); - Object old = alives.put(inernalId, child); + Object old = alives.put(internalId, child); assert old == null; - if (!top.nodesByInternalId.containsKey(inernalId)) { - generateNodeJoin(curTop, inernalId, child); + if (!top.nodesByInternalId.containsKey(internalId)) { + generateNodeJoin(curTop, internalId, child); watchAliveNodeData(child); http://git-wip-us.apache.org/repos/asf/ignite/blob/ce7f0cbc/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java index 8aac456..81edcde 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.zk.internal; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; @@ -52,6 +53,27 @@ public class ZookeeperClientTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testCreateAll() throws Exception { + startZK(1); + + ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(), 3000, null); + + client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT); + + List<String> paths = new ArrayList<>(); + + paths.add("/apacheIgnite/1"); + paths.add("/apacheIgnite/2"); + paths.add("/apacheIgnite/3"); + + client.createAll(paths, CreateMode.PERSISTENT); + + assertEquals(3, client.getChildren("/apacheIgnite").size()); + } + + /** + * @throws Exception If failed. + */ public void testDeleteAll() throws Exception { startZK(1); http://git-wip-us.apache.org/repos/asf/ignite/blob/ce7f0cbc/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index f20cb19..9bdf670 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -870,6 +870,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void topologyChangeWithRestarts(boolean restartZk, boolean closeClientSock) throws Exception { + sesTimeout = 30_000; + if (closeClientSock) testSockNio = true; @@ -986,6 +988,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void randomTopologyChanges(boolean restartZk, boolean closeClientSock) throws Exception { + sesTimeout = 30_000; + if (closeClientSock) testSockNio = true;