Revert deleteFutureData - Fixes #4236. Signed-off-by: Dmitriy Pavlov <dpav...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2e1b4b9c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2e1b4b9c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2e1b4b9c Branch: refs/heads/ignite-8446 Commit: 2e1b4b9cd250f75b09e3641794efb2d6523812b4 Parents: 4201647 Author: NSAmelchev <nsamelc...@gmail.com> Authored: Tue Jul 24 17:34:00 2018 +0300 Committer: Dmitriy Pavlov <dpav...@apache.org> Committed: Tue Jul 24 17:35:00 2018 +0300 ---------------------------------------------------------------------- .../discovery/zk/internal/ZookeeperClient.java | 126 +++++++++++++------ .../zk/internal/ZookeeperDiscoveryImpl.java | 13 +- .../zk/internal/ZookeeperClientTest.java | 93 ++++++++++++++ 3 files changed, 186 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2e1b4b9c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java index 6cc77a5..b58f0ce 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java @@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Timer; import java.util.TimerTask; @@ -286,35 +287,64 @@ public class ZookeeperClient implements Watcher { } /** - * * @param paths Paths to create. * @param createMode Create mode. - * @throws KeeperException.NodeExistsException If at least one of target node already exists. * @throws ZookeeperClientFailedException If connection to zk was lost. * @throws InterruptedException If interrupted. */ void createAll(List<String> paths, CreateMode createMode) - throws ZookeeperClientFailedException, InterruptedException, KeeperException.NodeExistsException - { - // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8188 - List<Op> ops = new ArrayList<>(paths.size()); + throws ZookeeperClientFailedException, InterruptedException { + if (paths.isEmpty()) + return; - for (String path : paths) - ops.add(Op.create(path, EMPTY_BYTES, ZK_ACL, createMode)); + List<List<Op>> batches = new LinkedList<>(); - for (;;) { - long connStartTime = this.connStartTime; + int batchSize = 0; - try { - zk.multi(ops); + List<Op> batch = new LinkedList<>(); - return; - } - catch (KeeperException.NodeExistsException e) { - throw e; + for (String path : paths) { + //TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8187 + int size = requestOverhead(path) + 48 /* overhead */; + + assert size <= MAX_REQ_SIZE; + + if (batchSize + size > MAX_REQ_SIZE) { + batches.add(batch); + + batch = new LinkedList<>(); + + batchSize = 0; } - catch (Exception e) { - onZookeeperError(connStartTime, e); + + batch.add(Op.create(path, EMPTY_BYTES, ZK_ACL, createMode)); + + batchSize += size; + } + + batches.add(batch); + + for (List<Op> ops : batches) { + for (;;) { + long connStartTime = this.connStartTime; + + try { + zk.multi(ops); + + break; + } + catch (KeeperException.NodeExistsException e) { + if (log.isDebugEnabled()) + log.debug("Failed to create nodes using bulk operation: " + e); + + for (Op op : ops) + createIfNeeded(op.getPath(), null, createMode); + + break; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } } } } @@ -560,38 +590,64 @@ public class ZookeeperClient implements Watcher { * @param parent Parent path. * @param paths Children paths. * @param ver Version. - * @throws KeeperException.NoNodeException If at least one of nodes does not exist. * @throws ZookeeperClientFailedException If connection to zk was lost. * @throws InterruptedException If interrupted. */ void deleteAll(@Nullable String parent, List<String> paths, int ver) - throws KeeperException.NoNodeException, ZookeeperClientFailedException, InterruptedException - { + throws ZookeeperClientFailedException, InterruptedException { if (paths.isEmpty()) return; - // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8188 - List<Op> ops = new ArrayList<>(paths.size()); + List<List<Op>> batches = new LinkedList<>(); + + int batchSize = 0; + + List<Op> batch = new LinkedList<>(); for (String path : paths) { String path0 = parent != null ? parent + "/" + path : path; - ops.add(Op.delete(path0, ver)); - } + //TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8187 + int size = requestOverhead(path0) + 17 /* overhead */; - for (;;) { - long connStartTime = this.connStartTime; + assert size <= MAX_REQ_SIZE; - try { - zk.multi(ops); + if (batchSize + size > MAX_REQ_SIZE) { + batches.add(batch); - return; - } - catch (KeeperException.NoNodeException e) { - throw e; + batch = new LinkedList<>(); + + batchSize = 0; } - catch (Exception e) { - onZookeeperError(connStartTime, e); + + batch.add(Op.delete(path0, ver)); + + batchSize += size; + } + + batches.add(batch); + + for (List<Op> ops : batches) { + for (;;) { + long connStartTime = this.connStartTime; + + try { + zk.multi(ops); + + break; + } + catch (KeeperException.NoNodeException e) { + if (log.isDebugEnabled()) + log.debug("Failed to delete nodes using bulk operation: " + e); + + for (Op op : ops) + deleteIfExists(op.getPath(), ver); + + break; + } + catch (Exception e) { + onZookeeperError(connStartTime, e); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2e1b4b9c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 8b17622..dc687b8 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -814,17 +814,8 @@ public class ZookeeperDiscoveryImpl { dirs.add(dir); } - try { - if (!dirs.isEmpty()) - client.createAll(dirs, PERSISTENT); - } - catch (KeeperException.NodeExistsException e) { - if (log.isDebugEnabled()) - log.debug("Failed to create nodes using bulk operation: " + e); - - for (String dir : dirs) - client.createIfNeeded(dir, null, PERSISTENT); - } + if (!dirs.isEmpty()) + client.createAll(dirs, PERSISTENT); } catch (ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); http://git-wip-us.apache.org/repos/asf/ignite/blob/2e1b4b9c/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java index d228e03..0d64980 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java @@ -142,6 +142,51 @@ public class ZookeeperClientTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testCreateAllRequestOverflow() throws Exception { + startZK(1); + + ZookeeperClient client = createClient(SES_TIMEOUT); + + client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT); + + int cnt = 20_000; + + List<String> paths = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) + paths.add("/apacheIgnite/" + i); + + client.createAll(paths, CreateMode.PERSISTENT); + + assertEquals(cnt, client.getChildren("/apacheIgnite").size()); + } + + /** + * @throws Exception If failed. + */ + public void testCreateAllNodeExists() throws Exception { + startZK(1); + + ZookeeperClient client = createClient(SES_TIMEOUT); + + client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT); + + client.createIfNeeded("/apacheIgnite/1", null, CreateMode.PERSISTENT); + + List<String> paths = new ArrayList<>(); + + paths.add("/apacheIgnite/1"); + paths.add("/apacheIgnite/2"); + paths.add("/apacheIgnite/3"); + + client.createAll(paths, CreateMode.PERSISTENT); + + assertEquals(3, client.getChildren("/apacheIgnite").size()); + } + + /** + * @throws Exception If failed. + */ public void testDeleteAll() throws Exception { startZK(1); @@ -164,6 +209,54 @@ public class ZookeeperClientTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testDeleteAllRequestOverflow() throws Exception { + startZK(1); + + ZookeeperClient client = createClient(SES_TIMEOUT); + + client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT); + + int cnt = 30_000; + + List<String> paths = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) + paths.add("/apacheIgnite/" + i); + + client.createAll(paths, CreateMode.PERSISTENT); + + assertEquals(cnt, client.getChildren("/apacheIgnite").size()); + + List<String> subPaths = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) + subPaths.add(String.valueOf(i)); + + client.deleteAll("/apacheIgnite", subPaths, -1); + + assertTrue(client.getChildren("/apacheIgnite").isEmpty()); + } + + /** + * @throws Exception If failed. + */ + public void testDeleteAllNoNode() throws Exception { + startZK(1); + + ZookeeperClient client = createClient(SES_TIMEOUT); + + client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT); + client.createIfNeeded("/apacheIgnite/1", null, CreateMode.PERSISTENT); + client.createIfNeeded("/apacheIgnite/2", null, CreateMode.PERSISTENT); + + client.deleteAll("/apacheIgnite", Arrays.asList("1", "2", "3"), -1); + + assertTrue(client.getChildren("/apacheIgnite").isEmpty()); + } + + /** + * @throws Exception If failed. + */ public void testConnectionLoss1() throws Exception { ZookeeperClient client = new ZookeeperClient(log, "localhost:2200", 3000, null);