Revert deleteFutureData - Fixes #4236.

Signed-off-by: Dmitriy Pavlov <dpav...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2e1b4b9c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2e1b4b9c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2e1b4b9c

Branch: refs/heads/ignite-8446
Commit: 2e1b4b9cd250f75b09e3641794efb2d6523812b4
Parents: 4201647
Author: NSAmelchev <nsamelc...@gmail.com>
Authored: Tue Jul 24 17:34:00 2018 +0300
Committer: Dmitriy Pavlov <dpav...@apache.org>
Committed: Tue Jul 24 17:35:00 2018 +0300

----------------------------------------------------------------------
 .../discovery/zk/internal/ZookeeperClient.java  | 126 +++++++++++++------
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  13 +-
 .../zk/internal/ZookeeperClientTest.java        |  93 ++++++++++++++
 3 files changed, 186 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2e1b4b9c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 6cc77a5..b58f0ce 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -286,35 +287,64 @@ public class ZookeeperClient implements Watcher {
     }
 
     /**
-     *
      * @param paths Paths to create.
      * @param createMode Create mode.
-     * @throws KeeperException.NodeExistsException If at least one of target 
node already exists.
      * @throws ZookeeperClientFailedException If connection to zk was lost.
      * @throws InterruptedException If interrupted.
      */
     void createAll(List<String> paths, CreateMode createMode)
-        throws ZookeeperClientFailedException, InterruptedException, 
KeeperException.NodeExistsException
-    {
-        // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8188
-        List<Op> ops = new ArrayList<>(paths.size());
+        throws ZookeeperClientFailedException, InterruptedException {
+        if (paths.isEmpty())
+            return;
 
-        for (String path : paths)
-            ops.add(Op.create(path, EMPTY_BYTES, ZK_ACL, createMode));
+        List<List<Op>> batches = new LinkedList<>();
 
-        for (;;) {
-            long connStartTime = this.connStartTime;
+        int batchSize = 0;
 
-            try {
-                zk.multi(ops);
+        List<Op> batch = new LinkedList<>();
 
-                return;
-            }
-            catch (KeeperException.NodeExistsException e) {
-                throw e;
+        for (String path : paths) {
+            //TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8187
+            int size = requestOverhead(path) + 48 /* overhead */;
+
+            assert size <= MAX_REQ_SIZE;
+
+            if (batchSize + size > MAX_REQ_SIZE) {
+                batches.add(batch);
+
+                batch = new LinkedList<>();
+
+                batchSize = 0;
             }
-            catch (Exception e) {
-                onZookeeperError(connStartTime, e);
+
+            batch.add(Op.create(path, EMPTY_BYTES, ZK_ACL, createMode));
+
+            batchSize += size;
+        }
+
+        batches.add(batch);
+
+        for (List<Op> ops : batches) {
+            for (;;) {
+                long connStartTime = this.connStartTime;
+
+                try {
+                    zk.multi(ops);
+
+                    break;
+                }
+                catch (KeeperException.NodeExistsException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to create nodes using bulk 
operation: " + e);
+
+                    for (Op op : ops)
+                        createIfNeeded(op.getPath(), null, createMode);
+
+                    break;
+                }
+                catch (Exception e) {
+                    onZookeeperError(connStartTime, e);
+                }
             }
         }
     }
@@ -560,38 +590,64 @@ public class ZookeeperClient implements Watcher {
      * @param parent Parent path.
      * @param paths Children paths.
      * @param ver Version.
-     * @throws KeeperException.NoNodeException If at least one of nodes does 
not exist.
      * @throws ZookeeperClientFailedException If connection to zk was lost.
      * @throws InterruptedException If interrupted.
      */
     void deleteAll(@Nullable String parent, List<String> paths, int ver)
-        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
-    {
+        throws ZookeeperClientFailedException, InterruptedException {
         if (paths.isEmpty())
             return;
 
-        // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8188
-        List<Op> ops = new ArrayList<>(paths.size());
+        List<List<Op>> batches = new LinkedList<>();
+
+        int batchSize = 0;
+
+        List<Op> batch = new LinkedList<>();
 
         for (String path : paths) {
             String path0 = parent != null ? parent + "/" + path : path;
 
-            ops.add(Op.delete(path0, ver));
-        }
+            //TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8187
+            int size = requestOverhead(path0) + 17 /* overhead */;
 
-        for (;;) {
-            long connStartTime = this.connStartTime;
+            assert size <= MAX_REQ_SIZE;
 
-            try {
-                zk.multi(ops);
+            if (batchSize + size > MAX_REQ_SIZE) {
+                batches.add(batch);
 
-                return;
-            }
-            catch (KeeperException.NoNodeException e) {
-                throw e;
+                batch = new LinkedList<>();
+
+                batchSize = 0;
             }
-            catch (Exception e) {
-                onZookeeperError(connStartTime, e);
+
+            batch.add(Op.delete(path0, ver));
+
+            batchSize += size;
+        }
+
+        batches.add(batch);
+
+        for (List<Op> ops : batches) {
+            for (;;) {
+                long connStartTime = this.connStartTime;
+
+                try {
+                    zk.multi(ops);
+
+                    break;
+                }
+                catch (KeeperException.NoNodeException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to delete nodes using bulk 
operation: " + e);
+
+                    for (Op op : ops)
+                        deleteIfExists(op.getPath(), ver);
+
+                    break;
+                }
+                catch (Exception e) {
+                    onZookeeperError(connStartTime, e);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2e1b4b9c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 8b17622..dc687b8 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -814,17 +814,8 @@ public class ZookeeperDiscoveryImpl {
                     dirs.add(dir);
             }
 
-            try {
-                if (!dirs.isEmpty())
-                    client.createAll(dirs, PERSISTENT);
-            }
-            catch (KeeperException.NodeExistsException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to create nodes using bulk operation: " 
+ e);
-
-                for (String dir : dirs)
-                    client.createIfNeeded(dir, null, PERSISTENT);
-            }
+            if (!dirs.isEmpty())
+                client.createAll(dirs, PERSISTENT);
         }
         catch (ZookeeperClientFailedException e) {
             throw new IgniteSpiException("Failed to initialize Zookeeper 
nodes", e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2e1b4b9c/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
index d228e03..0d64980 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
@@ -142,6 +142,51 @@ public class ZookeeperClientTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCreateAllRequestOverflow() throws Exception {
+        startZK(1);
+
+        ZookeeperClient client = createClient(SES_TIMEOUT);
+
+        client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT);
+
+        int cnt = 20_000;
+
+        List<String> paths = new ArrayList<>(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            paths.add("/apacheIgnite/" + i);
+
+        client.createAll(paths, CreateMode.PERSISTENT);
+
+        assertEquals(cnt, client.getChildren("/apacheIgnite").size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCreateAllNodeExists() throws Exception {
+        startZK(1);
+
+        ZookeeperClient client = createClient(SES_TIMEOUT);
+
+        client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT);
+
+        client.createIfNeeded("/apacheIgnite/1", null, CreateMode.PERSISTENT);
+
+        List<String> paths = new ArrayList<>();
+
+        paths.add("/apacheIgnite/1");
+        paths.add("/apacheIgnite/2");
+        paths.add("/apacheIgnite/3");
+
+        client.createAll(paths, CreateMode.PERSISTENT);
+
+        assertEquals(3, client.getChildren("/apacheIgnite").size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testDeleteAll() throws Exception {
         startZK(1);
 
@@ -164,6 +209,54 @@ public class ZookeeperClientTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testDeleteAllRequestOverflow() throws Exception {
+        startZK(1);
+
+        ZookeeperClient client = createClient(SES_TIMEOUT);
+
+        client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT);
+
+        int cnt = 30_000;
+
+        List<String> paths = new ArrayList<>(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            paths.add("/apacheIgnite/" + i);
+
+        client.createAll(paths, CreateMode.PERSISTENT);
+
+        assertEquals(cnt, client.getChildren("/apacheIgnite").size());
+
+        List<String> subPaths = new ArrayList<>(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            subPaths.add(String.valueOf(i));
+
+        client.deleteAll("/apacheIgnite", subPaths, -1);
+
+        assertTrue(client.getChildren("/apacheIgnite").isEmpty());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeleteAllNoNode() throws Exception {
+        startZK(1);
+
+        ZookeeperClient client = createClient(SES_TIMEOUT);
+
+        client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT);
+        client.createIfNeeded("/apacheIgnite/1", null, CreateMode.PERSISTENT);
+        client.createIfNeeded("/apacheIgnite/2", null, CreateMode.PERSISTENT);
+
+        client.deleteAll("/apacheIgnite", Arrays.asList("1", "2", "3"), -1);
+
+        assertTrue(client.getChildren("/apacheIgnite").isEmpty());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testConnectionLoss1() throws Exception {
         ZookeeperClient client = new ZookeeperClient(log, "localhost:2200", 
3000, null);
 

Reply via email to