Ported TestFramework to validate some of the new APIs

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e8d13522
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e8d13522
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e8d13522

Branch: refs/heads/CURATOR-3.0
Commit: e8d1352253ba0df915bb478e3bc40c2156477494
Parents: 2fa1a69
Author: randgalt <randg...@apache.org>
Authored: Sat Jan 7 01:38:52 2017 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sat Jan 7 01:38:52 2017 -0500

----------------------------------------------------------------------
 .../x/async/details/AsyncCreateBuilderImpl.java |   2 +-
 .../x/async/details/AsyncExistsBuilderImpl.java |   7 +-
 .../curator/framework/imps/TestFramework.java   | 613 +++++++++++++++++++
 .../framework/imps/TestFrameworkBackground.java | 272 ++++++++
 4 files changed, 892 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e8d13522/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
index 7723775..b3c91b3 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
@@ -132,7 +132,7 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder
         CreateBuilderImpl builder = new CreateBuilderImpl(client,
             createMode,
             common.backgrounding,
-            options.contains(CreateOption.createParentsIfNeeded),
+            options.contains(CreateOption.createParentsIfNeeded) || 
options.contains(CreateOption.createParentsAsContainers),
             options.contains(CreateOption.createParentsAsContainers),
             options.contains(CreateOption.doProtected),
             options.contains(CreateOption.compress),

http://git-wip-us.apache.org/repos/asf/curator/blob/e8d13522/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
index d672047..d3bb8ed 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java
@@ -59,7 +59,12 @@ class AsyncExistsBuilderImpl implements AsyncExistsBuilder
     public AsyncStage<Stat> forPath(String path)
     {
         BuilderCommon<Stat> common = new 
BuilderCommon<>(unhandledErrorListener, watchMode, safeStatProc);
-        ExistsBuilderImpl builder = new ExistsBuilderImpl(client, 
common.backgrounding, common.watcher, 
options.contains(ExistsOption.createParentsIfNeeded), 
options.contains(ExistsOption.createParentsAsContainers));
+        ExistsBuilderImpl builder = new ExistsBuilderImpl(client,
+            common.backgrounding,
+            common.watcher,
+            options.contains(ExistsOption.createParentsIfNeeded),
+            options.contains(ExistsOption.createParentsAsContainers)
+        );
         return safeCall(common.internalCallback, () -> builder.forPath(path));
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/e8d13522/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
 
b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
new file mode 100644
index 0000000..cd2f598
--- /dev/null
+++ 
b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -0,0 +1,613 @@
+package org.apache.curator.framework.imps;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncStage;
+import org.apache.curator.x.async.api.CreateOption;
+import org.apache.curator.x.async.api.DeleteOption;
+import org.apache.curator.x.async.api.ExistsOption;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+@SuppressWarnings("deprecation")
+public class TestFramework extends BaseClassForTests
+{
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception
+    {
+        System.setProperty("znode.container.checkIntervalMs", "1000");
+        super.setup();
+    }
+
+    @AfterMethod
+    @Override
+    public void teardown() throws Exception
+    {
+        System.clearProperty("znode.container.checkIntervalMs");
+        super.teardown();
+    }
+
+    @Test
+    public void testQuietDelete() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+
+            
async.delete().withOptions(EnumSet.of(DeleteOption.quietly)).forPath("/foo/bar");
+
+            final BlockingQueue<Integer> rc = new LinkedBlockingQueue<>();
+            BackgroundCallback backgroundCallback = (client1, event) -> 
rc.add(event.getResultCode());
+            
async.delete().withOptions(EnumSet.of(DeleteOption.quietly)).forPath("/foo/bar/hey").handle((v,
 e) -> {
+                if ( e == null )
+                {
+                    rc.add(KeeperException.Code.OK.intValue());
+                }
+                else
+                {
+                    rc.add(((KeeperException)e).code().intValue());
+                }
+                return null;
+            });
+
+            Integer code = rc.poll(new Timing().milliseconds(), 
TimeUnit.MILLISECONDS);
+            Assert.assertNotNull(code);
+            Assert.assertEquals(code.intValue(), 
KeeperException.Code.OK.intValue());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testNamespaceWithWatcher() throws Exception
+    {
+        CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+        CuratorFramework client = 
builder.connectString(server.getConnectString()).namespace("aisa").retryPolicy(new
 RetryOneTime(1)).build();
+        client.start();
+        try
+        {
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+            async.create().forPath("/base").
+                thenRun(() -> 
async.watched().getChildren().forPath("/base").event().handle((event, x) -> {
+                    try
+                    {
+                        queue.put(event.getPath());
+                    }
+                    catch ( InterruptedException e )
+                    {
+                        throw new Error(e);
+                    }
+                    return null;
+                }))
+                .thenRun(() -> async.create().forPath("/base/child"));
+
+            String path = queue.take();
+            Assert.assertEquals(path, "/base");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testCreateACLSingleAuth() throws Exception
+    {
+        CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+        CuratorFramework client = builder
+            .connectString(server.getConnectString())
+            .authorization("digest", "me1:pass1".getBytes())
+            .retryPolicy(new RetryOneTime(1))
+            .build();
+        client.start();
+        try
+        {
+            ACL acl = new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.AUTH_IDS);
+            List<ACL> aclList = Lists.newArrayList(acl);
+            client.create().withACL(aclList).forPath("/test", 
"test".getBytes());
+            client.close();
+
+            // Try setting data with me1:pass1
+            client = builder
+                .connectString(server.getConnectString())
+                .authorization("digest", "me1:pass1".getBytes())
+                .retryPolicy(new RetryOneTime(1))
+                .build();
+            client.start();
+            try
+            {
+                AsyncCuratorFramework async = 
AsyncCuratorFramework.wrap(client);
+                async.setData().forPath("/test", 
"test".getBytes()).toCompletableFuture().get();
+            }
+            catch ( ExecutionException e )
+            {
+                Assert.fail("Auth failed");
+            }
+            client.close();
+
+            // Try setting data with something:else
+            client = builder
+                .connectString(server.getConnectString())
+                .authorization("digest", "something:else".getBytes())
+                .retryPolicy(new RetryOneTime(1))
+                .build();
+            client.start();
+            try
+            {
+                AsyncCuratorFramework async = 
AsyncCuratorFramework.wrap(client);
+                async.setData().forPath("/test", 
"test".getBytes()).toCompletableFuture().get();
+                Assert.fail("Should have failed with auth exception");
+            }
+            catch ( ExecutionException e )
+            {
+                // expected
+            }
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testCreateACLMultipleAuths() throws Exception
+    {
+        // Add a few authInfos
+        List<AuthInfo> authInfos = new ArrayList<AuthInfo>();
+        authInfos.add(new AuthInfo("digest", "me1:pass1".getBytes()));
+        authInfos.add(new AuthInfo("digest", "me2:pass2".getBytes()));
+
+        CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+        CuratorFramework client = builder
+            .connectString(server.getConnectString())
+            .authorization(authInfos)
+            .retryPolicy(new RetryOneTime(1))
+            .build();
+        client.start();
+        try
+        {
+            ACL acl = new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.AUTH_IDS);
+            List<ACL> aclList = Lists.newArrayList(acl);
+            client.create().withACL(aclList).forPath("/test", 
"test".getBytes());
+            client.close();
+
+            // Try setting data with me1:pass1
+            client = builder
+                .connectString(server.getConnectString())
+                .authorization("digest", "me1:pass1".getBytes())
+                .retryPolicy(new RetryOneTime(1))
+                .build();
+            client.start();
+            try
+            {
+                AsyncCuratorFramework async = 
AsyncCuratorFramework.wrap(client);
+                async.setData().forPath("/test", 
"test".getBytes()).toCompletableFuture().get();
+            }
+            catch ( ExecutionException e )
+            {
+                Assert.fail("Auth failed");
+            }
+            client.close();
+
+            // Try setting data with me1:pass1
+            client = builder
+                .connectString(server.getConnectString())
+                .authorization("digest", "me2:pass2".getBytes())
+                .retryPolicy(new RetryOneTime(1))
+                .build();
+            client.start();
+            try
+            {
+                AsyncCuratorFramework async = 
AsyncCuratorFramework.wrap(client);
+                async.setData().forPath("/test", 
"test".getBytes()).toCompletableFuture().get();
+            }
+            catch ( ExecutionException e )
+            {
+                Assert.fail("Auth failed");
+            }
+            client.close();
+
+            // Try setting data with something:else
+            client = builder
+                .connectString(server.getConnectString())
+                .authorization("digest", "something:else".getBytes())
+                .retryPolicy(new RetryOneTime(1))
+                .build();
+            client.start();
+            try
+            {
+                AsyncCuratorFramework async = 
AsyncCuratorFramework.wrap(client);
+                async.setData().forPath("/test", 
"test".getBytes()).toCompletableFuture().get();
+                Assert.fail("Should have failed with auth exception");
+            }
+            catch ( ExecutionException e )
+            {
+                // expected
+            }
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testCreateACLWithReset() throws Exception
+    {
+        Timing timing = new Timing();
+        CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+        CuratorFramework client = builder
+            .connectString(server.getConnectString())
+            .sessionTimeoutMs(timing.session())
+            .connectionTimeoutMs(timing.connection())
+            .authorization("digest", "me:pass".getBytes())
+            .retryPolicy(new ExponentialBackoffRetry(100, 5))
+            .build();
+        client.start();
+        try
+        {
+            final CountDownLatch lostLatch = new CountDownLatch(1);
+            ConnectionStateListener listener = (client1, newState) ->
+            {
+                if ( newState == ConnectionState.LOST )
+                {
+                    lostLatch.countDown();
+                }
+            };
+            client.getConnectionStateListenable().addListener(listener);
+
+            ACL acl = new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.AUTH_IDS);
+            List<ACL> aclList = Lists.newArrayList(acl);
+            client.create().withACL(aclList).forPath("/test", 
"test".getBytes());
+
+            server.stop();
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+            try
+            {
+                AsyncCuratorFramework async = 
AsyncCuratorFramework.wrap(client);
+                async.checkExists().forPath("/").toCompletableFuture().get();
+                Assert.fail("Connection should be down");
+            }
+            catch ( ExecutionException e )
+            {
+                // expected
+            }
+
+            server.restart();
+            try
+            {
+                AsyncCuratorFramework async = 
AsyncCuratorFramework.wrap(client);
+                async.setData().forPath("/test", 
"test".getBytes()).toCompletableFuture().get();
+            }
+            catch ( ExecutionException e )
+            {
+                Assert.fail("Auth failed", e);
+            }
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testCreateParents() throws Exception
+    {
+        CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+        CuratorFramework client = 
builder.connectString(server.getConnectString()).retryPolicy(new 
RetryOneTime(1)).build();
+        client.start();
+        try
+        {
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            
async.create().withOptions(EnumSet.of(CreateOption.createParentsIfNeeded)).forPath("/one/two/three",
 "foo".getBytes()).toCompletableFuture().get();
+            byte[] data = 
async.getData().forPath("/one/two/three").toCompletableFuture().get();
+            Assert.assertEquals(data, "foo".getBytes());
+
+            
async.create().withOptions(EnumSet.of(CreateOption.createParentsIfNeeded)).forPath("/one/two/another",
 "bar".getBytes());
+            data = 
async.getData().forPath("/one/two/another").toCompletableFuture().get();
+            Assert.assertEquals(data, "bar".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testCreateParentContainers() throws Exception
+    {
+        if ( !checkForContainers() )
+        {
+            return;
+        }
+
+        CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+        CuratorFramework client = 
builder.connectString(server.getConnectString()).retryPolicy(new 
RetryOneTime(1)).build();
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            
async.create().withOptions(EnumSet.of(CreateOption.createParentsAsContainers)).forPath("/one/two/three",
 "foo".getBytes()).toCompletableFuture().get();
+            byte[] data = 
async.getData().forPath("/one/two/three").toCompletableFuture().get();
+            Assert.assertEquals(data, "foo".getBytes());
+
+            
async.delete().forPath("/one/two/three").toCompletableFuture().get();
+            new Timing().sleepABit();
+
+            
Assert.assertNull(async.checkExists().forPath("/one/two").toCompletableFuture().get());
+            new Timing().sleepABit();
+            
Assert.assertNull(async.checkExists().forPath("/one").toCompletableFuture().get());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    private boolean checkForContainers()
+    {
+        if ( ZKPaths.getContainerCreateMode() == CreateMode.PERSISTENT )
+        {
+            System.out.println("Not using CreateMode.CONTAINER enabled version 
of ZooKeeper");
+            return false;
+        }
+        return true;
+    }
+
+    @Test
+    public void testCreatingParentsTheSame() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+
+            Assert.assertNull(client.checkExists().forPath("/one/two"));
+            
async.create().withOptions(EnumSet.of(CreateOption.createParentsAsContainers)).forPath("/one/two/three").toCompletableFuture().get();
+            
Assert.assertNotNull(async.checkExists().forPath("/one/two").toCompletableFuture().get());
+
+            
async.delete().withOptions(EnumSet.of(DeleteOption.deletingChildrenIfNeeded)).forPath("/one").toCompletableFuture().get();
+            Assert.assertNull(client.checkExists().forPath("/one"));
+
+            
Assert.assertNull(async.checkExists().forPath("/one/two").toCompletableFuture().get());
+            
async.checkExists().withOptions(EnumSet.of(ExistsOption.createParentsAsContainers)).forPath("/one/two/three").toCompletableFuture().get();
+            
Assert.assertNotNull(async.checkExists().forPath("/one/two").toCompletableFuture().get());
+            
Assert.assertNull(async.checkExists().forPath("/one/two/three").toCompletableFuture().get());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testExistsCreatingParents() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+
+            
Assert.assertNull(async.checkExists().forPath("/one/two").toCompletableFuture().get());
+            
async.checkExists().withOptions(EnumSet.of(ExistsOption.createParentsAsContainers)).forPath("/one/two/three").toCompletableFuture().get();
+            
Assert.assertNotNull(async.checkExists().forPath("/one/two").toCompletableFuture().get());
+            
Assert.assertNull(async.checkExists().forPath("/one/two/three").toCompletableFuture().get());
+            
Assert.assertNull(async.checkExists().withOptions(EnumSet.of(ExistsOption.createParentsAsContainers)).forPath("/one/two/three").toCompletableFuture().get());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testSyncNew() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/head");
+            Assert.assertNotNull(client.checkExists().forPath("/head"));
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.sync().forPath("/head").handle((v, e) -> {
+                Assert.assertNull(v);
+                Assert.assertNull(e);
+                latch.countDown();
+                return null;
+            });
+            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testBackgroundDelete() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        client.start();
+        try
+        {
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            CountDownLatch latch = new CountDownLatch(1);
+            async.create().forPath("/head").thenRun(() ->
+                async.delete().forPath("/head").handle((v, e) -> {
+                    Assert.assertNull(v);
+                    Assert.assertNull(e);
+                    latch.countDown();
+                    return null;
+                })
+            );
+            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+            Assert.assertNull(client.checkExists().forPath("/head"));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testBackgroundDeleteWithChildren() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.getCuratorListenable().addListener
+                ((client1, event) ->
+                {
+                    if ( event.getType() == CuratorEventType.DELETE )
+                    {
+                        Assert.assertEquals(event.getPath(), "/one/two");
+                        ((CountDownLatch)event.getContext()).countDown();
+                    }
+                });
+
+            CountDownLatch latch = new CountDownLatch(1);
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            
async.create().withOptions(EnumSet.of(CreateOption.createParentsIfNeeded)).forPath("/one/two/three/four").thenRun(()
 ->
+                
async.delete().withOptions(EnumSet.of(DeleteOption.deletingChildrenIfNeeded)).forPath("/one/two").handle((v,
 e) -> {
+                    Assert.assertNull(v);
+                    Assert.assertNull(e);
+                    latch.countDown();
+                    return null;
+                })
+            );
+            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+            Assert.assertNull(client.checkExists().forPath("/one/two"));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testDeleteGuaranteedWithChildren() throws Exception
+    {
+        CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+        CuratorFramework client = 
builder.connectString(server.getConnectString()).retryPolicy(new 
RetryOneTime(1)).build();
+        client.start();
+        try
+        {
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            
async.create().withOptions(EnumSet.of(CreateOption.createParentsIfNeeded)).forPath("/one/two/three/four/five/six",
 "foo".getBytes()).toCompletableFuture().get();
+            async.delete().withOptions(EnumSet.of(DeleteOption.guaranteed, 
DeleteOption.deletingChildrenIfNeeded)).forPath("/one/two/three/four/five").toCompletableFuture().get();
+            
Assert.assertNull(async.checkExists().forPath("/one/two/three/four/five").toCompletableFuture().get());
+            async.delete().withOptions(EnumSet.of(DeleteOption.guaranteed, 
DeleteOption.deletingChildrenIfNeeded)).forPath("/one/two").toCompletableFuture().get();
+            
Assert.assertNull(async.checkExists().forPath("/one/two").toCompletableFuture().get());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testGetSequentialChildren() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        client.start();
+        try
+        {
+            Semaphore semaphore = new Semaphore(0);
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.create().forPath("/head").thenRun(() -> {
+                for ( int i = 0; i < 10; ++i )
+                {
+                    
async.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child").thenRun(semaphore::release);
+                }
+            });
+
+            Assert.assertTrue(new Timing().acquireSemaphore(semaphore, 10));
+            List<String> children = 
async.getChildren().forPath("/head").toCompletableFuture().get();
+            Assert.assertEquals(children.size(), 10);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testBackgroundGetDataWithWatch() throws Exception
+    {
+        final byte[] data1 = {1, 2, 3};
+        final byte[] data2 = {4, 5, 6, 7};
+
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        client.start();
+        try
+        {
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            async.create().forPath("/test", data1).toCompletableFuture().get();
+
+            CountDownLatch watchedLatch = new CountDownLatch(1);
+            CountDownLatch backgroundLatch = new CountDownLatch(1);
+            AsyncStage<byte[]> stage = 
async.watched().getData().forPath("/test");
+            stage.event().handle((event, x) -> {
+                Assert.assertEquals(event.getPath(), "/test");
+                watchedLatch.countDown();
+                return null;
+            });
+            stage.handle((d, x) -> {
+                Assert.assertEquals(d, data1);
+                backgroundLatch.countDown();
+                return null;
+            });
+
+            Assert.assertTrue(backgroundLatch.await(10, TimeUnit.SECONDS));
+
+            async.setData().forPath("/test", data2);
+            Assert.assertTrue(watchedLatch.await(10, TimeUnit.SECONDS));
+            byte[] checkData = 
async.getData().forPath("/test").toCompletableFuture().get();
+            Assert.assertEquals(checkData, data2);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e8d13522/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
 
b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
new file mode 100644
index 0000000..ce41d08
--- /dev/null
+++ 
b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -0,0 +1,272 @@
+package org.apache.curator.framework.imps;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestFrameworkBackground extends BaseClassForTests
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Test
+    public void testErrorListener() throws Exception
+    {
+        //The first call to the ACL provider will return a reasonable
+        //value. The second will throw an error. This is because the ACL
+        //provider is accessed prior to the backgrounding call.
+        final AtomicBoolean aclProviderCalled = new AtomicBoolean(false);
+        
+        ACLProvider badAclProvider = new ACLProvider()
+        {
+            @Override
+            public List<ACL> getDefaultAcl()
+            {
+                if(aclProviderCalled.getAndSet(true))
+                {
+                    throw new UnsupportedOperationException();
+                }
+                else
+                {
+                    return new ArrayList<>();
+                }
+            }
+
+            @Override
+            public List<ACL> getAclForPath(String path)
+            {
+                if(aclProviderCalled.getAndSet(true))
+                {
+                    throw new UnsupportedOperationException();
+                }
+                else
+                {
+                    return new ArrayList<>();
+                }
+            }
+        };
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+            .connectString(server.getConnectString())
+            .retryPolicy(new RetryOneTime(1))
+            .aclProvider(badAclProvider)
+            .build();
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+
+            final CountDownLatch errorLatch = new CountDownLatch(1);
+            UnhandledErrorListener listener = (message, e) -> {
+                if ( e instanceof UnsupportedOperationException )
+                {
+                    errorLatch.countDown();
+                }
+            };
+            
async.withUnhandledErrorListener(listener).create().forPath("/foo");
+            Assert.assertTrue(new Timing().awaitLatch(errorLatch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testListenerConnectedAtStart() throws Exception
+    {
+        server.stop();
+
+        Timing timing = new Timing(2);
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryNTimes(0, 0));
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+
+            final CountDownLatch connectedLatch = new CountDownLatch(1);
+            final AtomicBoolean firstListenerAction = new AtomicBoolean(true);
+            final AtomicReference<ConnectionState> firstListenerState = new 
AtomicReference<>();
+            ConnectionStateListener listener = (client1, newState) ->
+            {
+                if ( firstListenerAction.compareAndSet(true, false) )
+                {
+                    firstListenerState.set(newState);
+                    System.out.println("First listener state is " + newState);
+                }
+                if ( newState == ConnectionState.CONNECTED )
+                {
+                    connectedLatch.countDown();
+                }
+            };
+            client.getConnectionStateListenable().addListener(listener);
+
+            // due to CURATOR-72, this was causing a LOST event to precede the 
CONNECTED event
+            async.create().forPath("/foo");
+
+            server.restart();
+
+            Assert.assertTrue(timing.awaitLatch(connectedLatch));
+            Assert.assertFalse(firstListenerAction.get());
+            ConnectionState firstconnectionState = firstListenerState.get();
+            Assert.assertEquals(firstconnectionState, 
ConnectionState.CONNECTED, "First listener state MUST BE CONNECTED but is " + 
firstconnectionState);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testRetries() throws Exception
+    {
+        final int SLEEP = 1000;
+        final int TIMES = 5;
+
+        Timing timing = new Timing();
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryNTimes(TIMES, SLEEP));
+        try
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            client.getZookeeperClient().blockUntilConnectedOrTimedOut();
+
+            final CountDownLatch latch = new CountDownLatch(TIMES);
+            final List<Long> times = Lists.newArrayList();
+            final AtomicLong start = new 
AtomicLong(System.currentTimeMillis());
+            ((CuratorFrameworkImpl)client).debugListener = data ->
+            {
+                if ( 
data.getOperation().getClass().getName().contains("CreateBuilderImpl") )
+                {
+                    long now = System.currentTimeMillis();
+                    times.add(now - start.get());
+                    start.set(now);
+                    latch.countDown();
+                }
+            };
+
+            server.stop();
+            async.create().forPath("/one");
+
+            latch.await();
+
+            for ( long elapsed : times.subList(1, times.size()) )   // first 
one isn't a retry
+            {
+                Assert.assertTrue(elapsed >= SLEEP, elapsed + ": " + times);
+            }
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    /**
+     * Attempt a background operation while Zookeeper server is down.
+     * Return code must be {@link 
org.apache.zookeeper.KeeperException.Code#CONNECTIONLOSS}
+     */
+    @Test
+    public void testCuratorCallbackOnError() throws Exception
+    {
+        Timing timing = new Timing();
+        final CountDownLatch latch = new CountDownLatch(1);
+        try ( CuratorFramework client = 
CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(timing.session()).connectionTimeoutMs(timing.connection()).retryPolicy(new
 RetryOneTime(1000)).build() )
+        {
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+            // Stop the Zookeeper server
+            server.stop();
+            // Attempt to retrieve children list
+            async.getChildren().forPath("/").handle((children, e) -> {
+                if ( e instanceof KeeperException.ConnectionLossException )
+                {
+                    latch.countDown();
+                }
+                return null;
+            });
+            // Check if the callback has been called with a correct return code
+            Assert.assertTrue(timing.awaitLatch(latch), "Callback has not been 
called by curator !");
+        }
+    }
+
+    /**
+     * CURATOR-126
+     * Shutdown the Curator client while there are still background operations 
running.
+     */
+    @Test
+    public void testShutdown() throws Exception
+    {
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory
+            .builder()
+            .connectString(server.getConnectString())
+            .sessionTimeoutMs(timing.session())
+            .connectionTimeoutMs(timing.connection()).retryPolicy(new 
RetryOneTime(1))
+            .maxCloseWaitMs(timing.forWaiting().milliseconds())
+            .build();
+        try
+        {
+            final AtomicBoolean hadIllegalStateException = new 
AtomicBoolean(false);
+            ((CuratorFrameworkImpl)client).debugUnhandledErrorListener = 
(message, e) ->
+            {
+                if ( e instanceof IllegalStateException )
+                {
+                    hadIllegalStateException.set(true);
+                }
+            };
+            client.start();
+            AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
+
+            final CountDownLatch operationReadyLatch = new CountDownLatch(1);
+            ((CuratorFrameworkImpl)client).debugListener = data ->
+            {
+                try
+                {
+                    operationReadyLatch.await();
+                }
+                catch ( InterruptedException e )
+                {
+                    Thread.currentThread().interrupt();
+                }
+            };
+
+            // queue a background operation that will block due to the 
debugListener
+            async.create().forPath("/hey");
+            timing.sleepABit();
+
+            // close the client while the background is still blocked
+            client.close();
+
+            // unblock the background
+            operationReadyLatch.countDown();
+            timing.sleepABit();
+
+            // should not generate an exception
+            Assert.assertFalse(hadIllegalStateException.get());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+}

Reply via email to