Ported TestFramework to validate some of the new APIs
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e8d13522 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e8d13522 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e8d13522 Branch: refs/heads/CURATOR-3.0 Commit: e8d1352253ba0df915bb478e3bc40c2156477494 Parents: 2fa1a69 Author: randgalt <randg...@apache.org> Authored: Sat Jan 7 01:38:52 2017 -0500 Committer: randgalt <randg...@apache.org> Committed: Sat Jan 7 01:38:52 2017 -0500 ---------------------------------------------------------------------- .../x/async/details/AsyncCreateBuilderImpl.java | 2 +- .../x/async/details/AsyncExistsBuilderImpl.java | 7 +- .../curator/framework/imps/TestFramework.java | 613 +++++++++++++++++++ .../framework/imps/TestFrameworkBackground.java | 272 ++++++++ 4 files changed, 892 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/e8d13522/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java index 7723775..b3c91b3 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java @@ -132,7 +132,7 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder CreateBuilderImpl builder = new CreateBuilderImpl(client, createMode, common.backgrounding, - options.contains(CreateOption.createParentsIfNeeded), + options.contains(CreateOption.createParentsIfNeeded) || options.contains(CreateOption.createParentsAsContainers), options.contains(CreateOption.createParentsAsContainers), options.contains(CreateOption.doProtected), options.contains(CreateOption.compress), http://git-wip-us.apache.org/repos/asf/curator/blob/e8d13522/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java index d672047..d3bb8ed 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncExistsBuilderImpl.java @@ -59,7 +59,12 @@ class AsyncExistsBuilderImpl implements AsyncExistsBuilder public AsyncStage<Stat> forPath(String path) { BuilderCommon<Stat> common = new BuilderCommon<>(unhandledErrorListener, watchMode, safeStatProc); - ExistsBuilderImpl builder = new ExistsBuilderImpl(client, common.backgrounding, common.watcher, options.contains(ExistsOption.createParentsIfNeeded), options.contains(ExistsOption.createParentsAsContainers)); + ExistsBuilderImpl builder = new ExistsBuilderImpl(client, + common.backgrounding, + common.watcher, + options.contains(ExistsOption.createParentsIfNeeded), + options.contains(ExistsOption.createParentsAsContainers) + ); return safeCall(common.internalCallback, () -> builder.forPath(path)); } } http://git-wip-us.apache.org/repos/asf/curator/blob/e8d13522/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java new file mode 100644 index 0000000..cd2f598 --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFramework.java @@ -0,0 +1,613 @@ +package org.apache.curator.framework.imps; + +import com.google.common.collect.Lists; +import org.apache.curator.framework.AuthInfo; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.AsyncStage; +import org.apache.curator.x.async.api.CreateOption; +import org.apache.curator.x.async.api.DeleteOption; +import org.apache.curator.x.async.api.ExistsOption; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +@SuppressWarnings("deprecation") +public class TestFramework extends BaseClassForTests +{ + @BeforeMethod + @Override + public void setup() throws Exception + { + System.setProperty("znode.container.checkIntervalMs", "1000"); + super.setup(); + } + + @AfterMethod + @Override + public void teardown() throws Exception + { + System.clearProperty("znode.container.checkIntervalMs"); + super.teardown(); + } + + @Test + public void testQuietDelete() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + + async.delete().withOptions(EnumSet.of(DeleteOption.quietly)).forPath("/foo/bar"); + + final BlockingQueue<Integer> rc = new LinkedBlockingQueue<>(); + BackgroundCallback backgroundCallback = (client1, event) -> rc.add(event.getResultCode()); + async.delete().withOptions(EnumSet.of(DeleteOption.quietly)).forPath("/foo/bar/hey").handle((v, e) -> { + if ( e == null ) + { + rc.add(KeeperException.Code.OK.intValue()); + } + else + { + rc.add(((KeeperException)e).code().intValue()); + } + return null; + }); + + Integer code = rc.poll(new Timing().milliseconds(), TimeUnit.MILLISECONDS); + Assert.assertNotNull(code); + Assert.assertEquals(code.intValue(), KeeperException.Code.OK.intValue()); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testNamespaceWithWatcher() throws Exception + { + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + CuratorFramework client = builder.connectString(server.getConnectString()).namespace("aisa").retryPolicy(new RetryOneTime(1)).build(); + client.start(); + try + { + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); + async.create().forPath("/base"). + thenRun(() -> async.watched().getChildren().forPath("/base").event().handle((event, x) -> { + try + { + queue.put(event.getPath()); + } + catch ( InterruptedException e ) + { + throw new Error(e); + } + return null; + })) + .thenRun(() -> async.create().forPath("/base/child")); + + String path = queue.take(); + Assert.assertEquals(path, "/base"); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testCreateACLSingleAuth() throws Exception + { + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + CuratorFramework client = builder + .connectString(server.getConnectString()) + .authorization("digest", "me1:pass1".getBytes()) + .retryPolicy(new RetryOneTime(1)) + .build(); + client.start(); + try + { + ACL acl = new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.AUTH_IDS); + List<ACL> aclList = Lists.newArrayList(acl); + client.create().withACL(aclList).forPath("/test", "test".getBytes()); + client.close(); + + // Try setting data with me1:pass1 + client = builder + .connectString(server.getConnectString()) + .authorization("digest", "me1:pass1".getBytes()) + .retryPolicy(new RetryOneTime(1)) + .build(); + client.start(); + try + { + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.setData().forPath("/test", "test".getBytes()).toCompletableFuture().get(); + } + catch ( ExecutionException e ) + { + Assert.fail("Auth failed"); + } + client.close(); + + // Try setting data with something:else + client = builder + .connectString(server.getConnectString()) + .authorization("digest", "something:else".getBytes()) + .retryPolicy(new RetryOneTime(1)) + .build(); + client.start(); + try + { + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.setData().forPath("/test", "test".getBytes()).toCompletableFuture().get(); + Assert.fail("Should have failed with auth exception"); + } + catch ( ExecutionException e ) + { + // expected + } + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testCreateACLMultipleAuths() throws Exception + { + // Add a few authInfos + List<AuthInfo> authInfos = new ArrayList<AuthInfo>(); + authInfos.add(new AuthInfo("digest", "me1:pass1".getBytes())); + authInfos.add(new AuthInfo("digest", "me2:pass2".getBytes())); + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + CuratorFramework client = builder + .connectString(server.getConnectString()) + .authorization(authInfos) + .retryPolicy(new RetryOneTime(1)) + .build(); + client.start(); + try + { + ACL acl = new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.AUTH_IDS); + List<ACL> aclList = Lists.newArrayList(acl); + client.create().withACL(aclList).forPath("/test", "test".getBytes()); + client.close(); + + // Try setting data with me1:pass1 + client = builder + .connectString(server.getConnectString()) + .authorization("digest", "me1:pass1".getBytes()) + .retryPolicy(new RetryOneTime(1)) + .build(); + client.start(); + try + { + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.setData().forPath("/test", "test".getBytes()).toCompletableFuture().get(); + } + catch ( ExecutionException e ) + { + Assert.fail("Auth failed"); + } + client.close(); + + // Try setting data with me1:pass1 + client = builder + .connectString(server.getConnectString()) + .authorization("digest", "me2:pass2".getBytes()) + .retryPolicy(new RetryOneTime(1)) + .build(); + client.start(); + try + { + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.setData().forPath("/test", "test".getBytes()).toCompletableFuture().get(); + } + catch ( ExecutionException e ) + { + Assert.fail("Auth failed"); + } + client.close(); + + // Try setting data with something:else + client = builder + .connectString(server.getConnectString()) + .authorization("digest", "something:else".getBytes()) + .retryPolicy(new RetryOneTime(1)) + .build(); + client.start(); + try + { + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.setData().forPath("/test", "test".getBytes()).toCompletableFuture().get(); + Assert.fail("Should have failed with auth exception"); + } + catch ( ExecutionException e ) + { + // expected + } + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testCreateACLWithReset() throws Exception + { + Timing timing = new Timing(); + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + CuratorFramework client = builder + .connectString(server.getConnectString()) + .sessionTimeoutMs(timing.session()) + .connectionTimeoutMs(timing.connection()) + .authorization("digest", "me:pass".getBytes()) + .retryPolicy(new ExponentialBackoffRetry(100, 5)) + .build(); + client.start(); + try + { + final CountDownLatch lostLatch = new CountDownLatch(1); + ConnectionStateListener listener = (client1, newState) -> + { + if ( newState == ConnectionState.LOST ) + { + lostLatch.countDown(); + } + }; + client.getConnectionStateListenable().addListener(listener); + + ACL acl = new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.AUTH_IDS); + List<ACL> aclList = Lists.newArrayList(acl); + client.create().withACL(aclList).forPath("/test", "test".getBytes()); + + server.stop(); + Assert.assertTrue(timing.awaitLatch(lostLatch)); + try + { + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.checkExists().forPath("/").toCompletableFuture().get(); + Assert.fail("Connection should be down"); + } + catch ( ExecutionException e ) + { + // expected + } + + server.restart(); + try + { + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.setData().forPath("/test", "test".getBytes()).toCompletableFuture().get(); + } + catch ( ExecutionException e ) + { + Assert.fail("Auth failed", e); + } + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testCreateParents() throws Exception + { + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build(); + client.start(); + try + { + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.create().withOptions(EnumSet.of(CreateOption.createParentsIfNeeded)).forPath("/one/two/three", "foo".getBytes()).toCompletableFuture().get(); + byte[] data = async.getData().forPath("/one/two/three").toCompletableFuture().get(); + Assert.assertEquals(data, "foo".getBytes()); + + async.create().withOptions(EnumSet.of(CreateOption.createParentsIfNeeded)).forPath("/one/two/another", "bar".getBytes()); + data = async.getData().forPath("/one/two/another").toCompletableFuture().get(); + Assert.assertEquals(data, "bar".getBytes()); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testCreateParentContainers() throws Exception + { + if ( !checkForContainers() ) + { + return; + } + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build(); + try + { + client.start(); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.create().withOptions(EnumSet.of(CreateOption.createParentsAsContainers)).forPath("/one/two/three", "foo".getBytes()).toCompletableFuture().get(); + byte[] data = async.getData().forPath("/one/two/three").toCompletableFuture().get(); + Assert.assertEquals(data, "foo".getBytes()); + + async.delete().forPath("/one/two/three").toCompletableFuture().get(); + new Timing().sleepABit(); + + Assert.assertNull(async.checkExists().forPath("/one/two").toCompletableFuture().get()); + new Timing().sleepABit(); + Assert.assertNull(async.checkExists().forPath("/one").toCompletableFuture().get()); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + private boolean checkForContainers() + { + if ( ZKPaths.getContainerCreateMode() == CreateMode.PERSISTENT ) + { + System.out.println("Not using CreateMode.CONTAINER enabled version of ZooKeeper"); + return false; + } + return true; + } + + @Test + public void testCreatingParentsTheSame() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + + Assert.assertNull(client.checkExists().forPath("/one/two")); + async.create().withOptions(EnumSet.of(CreateOption.createParentsAsContainers)).forPath("/one/two/three").toCompletableFuture().get(); + Assert.assertNotNull(async.checkExists().forPath("/one/two").toCompletableFuture().get()); + + async.delete().withOptions(EnumSet.of(DeleteOption.deletingChildrenIfNeeded)).forPath("/one").toCompletableFuture().get(); + Assert.assertNull(client.checkExists().forPath("/one")); + + Assert.assertNull(async.checkExists().forPath("/one/two").toCompletableFuture().get()); + async.checkExists().withOptions(EnumSet.of(ExistsOption.createParentsAsContainers)).forPath("/one/two/three").toCompletableFuture().get(); + Assert.assertNotNull(async.checkExists().forPath("/one/two").toCompletableFuture().get()); + Assert.assertNull(async.checkExists().forPath("/one/two/three").toCompletableFuture().get()); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testExistsCreatingParents() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + + Assert.assertNull(async.checkExists().forPath("/one/two").toCompletableFuture().get()); + async.checkExists().withOptions(EnumSet.of(ExistsOption.createParentsAsContainers)).forPath("/one/two/three").toCompletableFuture().get(); + Assert.assertNotNull(async.checkExists().forPath("/one/two").toCompletableFuture().get()); + Assert.assertNull(async.checkExists().forPath("/one/two/three").toCompletableFuture().get()); + Assert.assertNull(async.checkExists().withOptions(EnumSet.of(ExistsOption.createParentsAsContainers)).forPath("/one/two/three").toCompletableFuture().get()); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testSyncNew() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try + { + client.create().forPath("/head"); + Assert.assertNotNull(client.checkExists().forPath("/head")); + + final CountDownLatch latch = new CountDownLatch(1); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.sync().forPath("/head").handle((v, e) -> { + Assert.assertNull(v); + Assert.assertNull(e); + latch.countDown(); + return null; + }); + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testBackgroundDelete() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try + { + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + CountDownLatch latch = new CountDownLatch(1); + async.create().forPath("/head").thenRun(() -> + async.delete().forPath("/head").handle((v, e) -> { + Assert.assertNull(v); + Assert.assertNull(e); + latch.countDown(); + return null; + }) + ); + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + Assert.assertNull(client.checkExists().forPath("/head")); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testBackgroundDeleteWithChildren() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try + { + client.getCuratorListenable().addListener + ((client1, event) -> + { + if ( event.getType() == CuratorEventType.DELETE ) + { + Assert.assertEquals(event.getPath(), "/one/two"); + ((CountDownLatch)event.getContext()).countDown(); + } + }); + + CountDownLatch latch = new CountDownLatch(1); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.create().withOptions(EnumSet.of(CreateOption.createParentsIfNeeded)).forPath("/one/two/three/four").thenRun(() -> + async.delete().withOptions(EnumSet.of(DeleteOption.deletingChildrenIfNeeded)).forPath("/one/two").handle((v, e) -> { + Assert.assertNull(v); + Assert.assertNull(e); + latch.countDown(); + return null; + }) + ); + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); + Assert.assertNull(client.checkExists().forPath("/one/two")); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testDeleteGuaranteedWithChildren() throws Exception + { + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build(); + client.start(); + try + { + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.create().withOptions(EnumSet.of(CreateOption.createParentsIfNeeded)).forPath("/one/two/three/four/five/six", "foo".getBytes()).toCompletableFuture().get(); + async.delete().withOptions(EnumSet.of(DeleteOption.guaranteed, DeleteOption.deletingChildrenIfNeeded)).forPath("/one/two/three/four/five").toCompletableFuture().get(); + Assert.assertNull(async.checkExists().forPath("/one/two/three/four/five").toCompletableFuture().get()); + async.delete().withOptions(EnumSet.of(DeleteOption.guaranteed, DeleteOption.deletingChildrenIfNeeded)).forPath("/one/two").toCompletableFuture().get(); + Assert.assertNull(async.checkExists().forPath("/one/two").toCompletableFuture().get()); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testGetSequentialChildren() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try + { + Semaphore semaphore = new Semaphore(0); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.create().forPath("/head").thenRun(() -> { + for ( int i = 0; i < 10; ++i ) + { + async.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child").thenRun(semaphore::release); + } + }); + + Assert.assertTrue(new Timing().acquireSemaphore(semaphore, 10)); + List<String> children = async.getChildren().forPath("/head").toCompletableFuture().get(); + Assert.assertEquals(children.size(), 10); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testBackgroundGetDataWithWatch() throws Exception + { + final byte[] data1 = {1, 2, 3}; + final byte[] data2 = {4, 5, 6, 7}; + + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try + { + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + async.create().forPath("/test", data1).toCompletableFuture().get(); + + CountDownLatch watchedLatch = new CountDownLatch(1); + CountDownLatch backgroundLatch = new CountDownLatch(1); + AsyncStage<byte[]> stage = async.watched().getData().forPath("/test"); + stage.event().handle((event, x) -> { + Assert.assertEquals(event.getPath(), "/test"); + watchedLatch.countDown(); + return null; + }); + stage.handle((d, x) -> { + Assert.assertEquals(d, data1); + backgroundLatch.countDown(); + return null; + }); + + Assert.assertTrue(backgroundLatch.await(10, TimeUnit.SECONDS)); + + async.setData().forPath("/test", data2); + Assert.assertTrue(watchedLatch.await(10, TimeUnit.SECONDS)); + byte[] checkData = async.getData().forPath("/test").toCompletableFuture().get(); + Assert.assertEquals(checkData, data2); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/e8d13522/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java new file mode 100644 index 0000000..ce41d08 --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java @@ -0,0 +1,272 @@ +package org.apache.curator.framework.imps; + +import com.google.common.collect.Lists; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +public class TestFrameworkBackground extends BaseClassForTests +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Test + public void testErrorListener() throws Exception + { + //The first call to the ACL provider will return a reasonable + //value. The second will throw an error. This is because the ACL + //provider is accessed prior to the backgrounding call. + final AtomicBoolean aclProviderCalled = new AtomicBoolean(false); + + ACLProvider badAclProvider = new ACLProvider() + { + @Override + public List<ACL> getDefaultAcl() + { + if(aclProviderCalled.getAndSet(true)) + { + throw new UnsupportedOperationException(); + } + else + { + return new ArrayList<>(); + } + } + + @Override + public List<ACL> getAclForPath(String path) + { + if(aclProviderCalled.getAndSet(true)) + { + throw new UnsupportedOperationException(); + } + else + { + return new ArrayList<>(); + } + } + }; + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryOneTime(1)) + .aclProvider(badAclProvider) + .build(); + try + { + client.start(); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + + final CountDownLatch errorLatch = new CountDownLatch(1); + UnhandledErrorListener listener = (message, e) -> { + if ( e instanceof UnsupportedOperationException ) + { + errorLatch.countDown(); + } + }; + async.withUnhandledErrorListener(listener).create().forPath("/foo"); + Assert.assertTrue(new Timing().awaitLatch(errorLatch)); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testListenerConnectedAtStart() throws Exception + { + server.stop(); + + Timing timing = new Timing(2); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryNTimes(0, 0)); + try + { + client.start(); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + + final CountDownLatch connectedLatch = new CountDownLatch(1); + final AtomicBoolean firstListenerAction = new AtomicBoolean(true); + final AtomicReference<ConnectionState> firstListenerState = new AtomicReference<>(); + ConnectionStateListener listener = (client1, newState) -> + { + if ( firstListenerAction.compareAndSet(true, false) ) + { + firstListenerState.set(newState); + System.out.println("First listener state is " + newState); + } + if ( newState == ConnectionState.CONNECTED ) + { + connectedLatch.countDown(); + } + }; + client.getConnectionStateListenable().addListener(listener); + + // due to CURATOR-72, this was causing a LOST event to precede the CONNECTED event + async.create().forPath("/foo"); + + server.restart(); + + Assert.assertTrue(timing.awaitLatch(connectedLatch)); + Assert.assertFalse(firstListenerAction.get()); + ConnectionState firstconnectionState = firstListenerState.get(); + Assert.assertEquals(firstconnectionState, ConnectionState.CONNECTED, "First listener state MUST BE CONNECTED but is " + firstconnectionState); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testRetries() throws Exception + { + final int SLEEP = 1000; + final int TIMES = 5; + + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryNTimes(TIMES, SLEEP)); + try + { + client.start(); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + client.getZookeeperClient().blockUntilConnectedOrTimedOut(); + + final CountDownLatch latch = new CountDownLatch(TIMES); + final List<Long> times = Lists.newArrayList(); + final AtomicLong start = new AtomicLong(System.currentTimeMillis()); + ((CuratorFrameworkImpl)client).debugListener = data -> + { + if ( data.getOperation().getClass().getName().contains("CreateBuilderImpl") ) + { + long now = System.currentTimeMillis(); + times.add(now - start.get()); + start.set(now); + latch.countDown(); + } + }; + + server.stop(); + async.create().forPath("/one"); + + latch.await(); + + for ( long elapsed : times.subList(1, times.size()) ) // first one isn't a retry + { + Assert.assertTrue(elapsed >= SLEEP, elapsed + ": " + times); + } + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + /** + * Attempt a background operation while Zookeeper server is down. + * Return code must be {@link org.apache.zookeeper.KeeperException.Code#CONNECTIONLOSS} + */ + @Test + public void testCuratorCallbackOnError() throws Exception + { + Timing timing = new Timing(); + final CountDownLatch latch = new CountDownLatch(1); + try ( CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(timing.session()).connectionTimeoutMs(timing.connection()).retryPolicy(new RetryOneTime(1000)).build() ) + { + client.start(); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + // Stop the Zookeeper server + server.stop(); + // Attempt to retrieve children list + async.getChildren().forPath("/").handle((children, e) -> { + if ( e instanceof KeeperException.ConnectionLossException ) + { + latch.countDown(); + } + return null; + }); + // Check if the callback has been called with a correct return code + Assert.assertTrue(timing.awaitLatch(latch), "Callback has not been called by curator !"); + } + } + + /** + * CURATOR-126 + * Shutdown the Curator client while there are still background operations running. + */ + @Test + public void testShutdown() throws Exception + { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory + .builder() + .connectString(server.getConnectString()) + .sessionTimeoutMs(timing.session()) + .connectionTimeoutMs(timing.connection()).retryPolicy(new RetryOneTime(1)) + .maxCloseWaitMs(timing.forWaiting().milliseconds()) + .build(); + try + { + final AtomicBoolean hadIllegalStateException = new AtomicBoolean(false); + ((CuratorFrameworkImpl)client).debugUnhandledErrorListener = (message, e) -> + { + if ( e instanceof IllegalStateException ) + { + hadIllegalStateException.set(true); + } + }; + client.start(); + AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client); + + final CountDownLatch operationReadyLatch = new CountDownLatch(1); + ((CuratorFrameworkImpl)client).debugListener = data -> + { + try + { + operationReadyLatch.await(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + } + }; + + // queue a background operation that will block due to the debugListener + async.create().forPath("/hey"); + timing.sleepABit(); + + // close the client while the background is still blocked + client.close(); + + // unblock the background + operationReadyLatch.countDown(); + timing.sleepABit(); + + // should not generate an exception + Assert.assertFalse(hadIllegalStateException.get()); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } +}