Repository: curator
Updated Branches:
  refs/heads/persistent-watch 01652cef6 -> bf73f0d39


http://git-wip-us.apache.org/repos/asf/curator/blob/bf73f0d3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
new file mode 100644
index 0000000..4bc423a
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestSingleLevelCuratorCache.java
@@ -0,0 +1,961 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.watch;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.imps.TestCleanState;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.testng.AssertJUnit.assertNotNull;
+
+public class TestSingleLevelCuratorCache extends BaseClassForTests
+{
+    private static final Timing timing = new Timing();
+
+    @Test
+    public void testWithBadConnect() throws Exception
+    {
+        final int serverPort = server.getPort();
+        server.close();
+
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), 1000, 1000, new 
RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final CuratorCache cache = CuratorCacheBuilder.builder(client, 
"/").forSingleLevel().build();
+            final CountDownLatch addedLatch = new CountDownLatch(1);
+            CacheListener listener = new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    if ( (event == CacheEvent.NODE_CREATED) && 
path.equals("/baz"))
+                    {
+                        addedLatch.countDown();
+                    }
+                }
+            };
+            cache.getListenable().addListener(listener);
+            cache.start();
+
+            final CountDownLatch connectedLatch = new CountDownLatch(1);
+            client.getConnectionStateListenable().addListener(new 
ConnectionStateListener()
+            {
+                
+                @Override
+                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                {
+                    if(newState == ConnectionState.CONNECTED)
+                    {
+                        connectedLatch.countDown();
+                    }
+                }
+            });
+
+            server = new TestingServer(serverPort, true);
+            
+            Assert.assertTrue(timing.awaitLatch(connectedLatch));
+
+            client.create().creatingParentContainersIfNeeded().forPath("/baz", 
new byte[]{1, 2, 3});
+
+            assertNotNull("/baz does not exist", 
client.checkExists().forPath("/baz"));
+
+            Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+            assertNotNull("cache doesn't see /baz", 
cache.get("/baz").getData());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testPostInitializedForEmpty() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            cache = CuratorCacheBuilder.builder(client, 
"/test").forSingleLevel().build();
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    if ( event == CacheEvent.CACHE_REFRESHED )
+                    {
+                        latch.countDown();
+                    }
+                }
+            });
+            cache.start();
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testAsyncInitialPopulation() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            client.create().forPath("/test");
+            client.create().forPath("/test/one", "hey there".getBytes());
+
+            final BlockingQueue<CacheEvent> events = new 
LinkedBlockingQueue<>();
+            cache = CuratorCacheBuilder.builder(client, 
"/test").forSingleLevel().sendingRefreshEvents(false).build();
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    events.offer(event);
+                }
+            });
+            cache.start();
+            CountDownLatch latch = cache.refreshAll();
+
+            CacheEvent event = events.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS);
+            Assert.assertEquals(event, CacheEvent.NODE_CREATED);
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testChildrenInitialized() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            cache = CuratorCacheBuilder.builder(client, 
"/test").forSingleLevel().build();
+
+            final CountDownLatch addedLatch = new CountDownLatch(3);
+            final CountDownLatch initLatch = new CountDownLatch(1);
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    if ( event == CacheEvent.NODE_CREATED )
+                    {
+                        addedLatch.countDown();
+                    }
+                    else if ( event == CacheEvent.CACHE_REFRESHED )
+                    {
+                        initLatch.countDown();
+                    }
+                }
+            });
+
+            client.create().forPath("/test/1", "1".getBytes());
+            client.create().forPath("/test/2", "2".getBytes());
+            client.create().forPath("/test/3", "3".getBytes());
+
+            cache.start();
+
+            Assert.assertTrue(timing.awaitLatch(addedLatch));
+            Assert.assertTrue(timing.awaitLatch(initLatch));
+            Assert.assertEquals(cache.size(), 3);
+            Assert.assertEquals(cache.get("/test/1").getData(), 
"1".getBytes());
+            Assert.assertEquals(cache.get("/test/2").getData(), 
"2".getBytes());
+            Assert.assertEquals(cache.get("/test/3").getData(), 
"3".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testChildrenInitializedNormal() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().forPath("/test");
+
+            cache = CuratorCacheBuilder.builder(client, 
"/test").forSingleLevel().sendingRefreshEvents(false).build();
+
+            final CountDownLatch addedLatch = new CountDownLatch(3);
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    Assert.assertNotEquals(event, CacheEvent.CACHE_REFRESHED);
+                    if ( event == CacheEvent.NODE_CREATED )
+                    {
+                        addedLatch.countDown();
+                    }
+                }
+            });
+
+            client.create().forPath("/test/1", "1".getBytes());
+            client.create().forPath("/test/2", "2".getBytes());
+            client.create().forPath("/test/3", "3".getBytes());
+
+            cache.start();
+
+            Assert.assertTrue(timing.awaitLatch(addedLatch));
+            Assert.assertEquals(cache.size(), 3);
+            Assert.assertEquals(cache.get("/test/1").getData(), 
"1".getBytes());
+            Assert.assertEquals(cache.get("/test/2").getData(), 
"2".getBytes());
+            Assert.assertEquals(cache.get("/test/3").getData(), 
"3".getBytes());
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testUpdateWhenNotCachingData() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            final CountDownLatch updatedLatch = new CountDownLatch(1);
+            final CountDownLatch addedLatch = new CountDownLatch(1);
+            client.create().creatingParentsIfNeeded().forPath("/test");
+            SingleLevelCacheFilter cacheFilter = new 
SingleLevelCacheFilter("/test", CacheAction.PATH_ONLY);
+            cache = CuratorCacheBuilder.builder(client, 
"/test").forSingleLevel().withCacheFilter(cacheFilter).build();
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    if ( event == CacheEvent.NODE_CHANGED )
+                    {
+                        updatedLatch.countDown();
+                    }
+                    else if ( event == CacheEvent.NODE_CREATED )
+                    {
+                        addedLatch.countDown();
+                    }
+                }
+            });
+            cache.start();
+
+            client.create().forPath("/test/foo", "first".getBytes());
+            Assert.assertTrue(timing.awaitLatch(addedLatch));
+
+            client.setData().forPath("/test/foo", "something new".getBytes());
+            Assert.assertTrue(timing.awaitLatch(updatedLatch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    @Test
+    public void testDeleteThenCreate() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+            client.create().forPath("/test/foo", "one".getBytes());
+
+            final AtomicReference<Throwable> error = new 
AtomicReference<Throwable>();
+            client.getUnhandledErrorListenable().addListener
+                (
+                    new UnhandledErrorListener()
+                    {
+                        @Override
+                        public void unhandledError(String message, Throwable e)
+                        {
+                            error.set(e);
+                        }
+                    }
+                );
+
+            final CountDownLatch initializedLatch = new CountDownLatch(1);
+            final CountDownLatch removedLatch = new CountDownLatch(1);
+            final CountDownLatch postRemovedLatch = new CountDownLatch(1);
+            final CountDownLatch dataLatch = new CountDownLatch(1);
+            try ( CuratorCache cache = CuratorCacheBuilder.builder(client, 
"/test").forSingleLevel().build() )
+            {
+                cache.getListenable().addListener(new CacheListener()
+                {
+                    @Override
+                    public void process(CacheEvent event, String path)
+                    {
+                        if ( event == CacheEvent.CACHE_REFRESHED )
+                        {
+                            initializedLatch.countDown();
+                        }
+                        else if ( initializedLatch.getCount() == 0 )
+                        {
+                            if ( event == CacheEvent.NODE_DELETED )
+                            {
+                                removedLatch.countDown();
+                                
Assert.assertTrue(timing.awaitLatch(postRemovedLatch));
+                            }
+                            else
+                            {
+                                try
+                                {
+                                    
Assert.assertEquals(cache.get(path).getData(), "two".getBytes());
+                                }
+                                finally
+                                {
+                                    dataLatch.countDown();
+                                }
+                            }
+                        }
+                    }
+                });
+                cache.start();
+                Assert.assertTrue(timing.awaitLatch(initializedLatch));
+
+                client.delete().forPath("/test/foo");
+                Assert.assertTrue(timing.awaitLatch(removedLatch));
+                client.create().forPath("/test/foo", "two".getBytes());
+                postRemovedLatch.countDown();
+                Assert.assertTrue(timing.awaitLatch(dataLatch));
+
+                Throwable t = error.get();
+                if ( t != null )
+                {
+                    Assert.fail("Assert", t);
+                }
+            }
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    //@Test
+    public void testRebuildAgainstOtherProcesses() throws Exception
+    {
+        final CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+            client.create().forPath("/test/foo");
+            client.create().forPath("/test/bar");
+            client.create().forPath("/test/snafu", "original".getBytes());
+
+            final CountDownLatch addedLatch = new CountDownLatch(2);
+            try ( final CuratorCache cache = 
CuratorCacheBuilder.builder(client, "/test").forSingleLevel().build() )
+            {
+                cache.getListenable().addListener(new CacheListener()
+                {
+                    @Override
+                    public void process(CacheEvent event, String path)
+                    {
+                        if ( event == CacheEvent.NODE_CREATED )
+                        {
+                            if ( path.equals("/test/test") )
+                            {
+                                addedLatch.countDown();
+                            }
+                        }
+                        else if ( event == CacheEvent.NODE_CHANGED )
+                        {
+                            if ( path.equals("/test/snafu") )
+                            {
+                                addedLatch.countDown();
+                            }
+                        }
+                    }
+                });
+                ((InternalCuratorCache)cache).rebuildTestExchanger = new 
Exchanger<Object>();
+                ExecutorService service = Executors.newSingleThreadExecutor();
+                final AtomicReference<String> deletedPath = new 
AtomicReference<String>();
+                Future<Object> future = service.submit
+                    (
+                        new Callable<Object>()
+                        {
+                            @Override
+                            public Object call() throws Exception
+                            {
+                                
((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+
+                                // simulate another process adding a node 
while we're rebuilding
+                                client.create().forPath("/test/test");
+
+                                Assert.assertTrue(cache.size() > 0);
+
+                                // simulate another process removing a node 
while we're rebuilding
+                                client.delete().forPath("/test/bar");
+                                deletedPath.set("/test/bar");
+
+                                
((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+
+                                CachedNode cachedNode = null;
+                                while ( cachedNode == null )
+                                {
+                                    cachedNode = cache.get("/test/snafu");
+                                    timing.sleepABit();
+                                }
+                                Assert.assertEquals(cachedNode.getData(), 
"original".getBytes());
+                                client.setData().forPath("/test/snafu", 
"grilled".getBytes());
+
+                                
((InternalCuratorCache)cache).rebuildTestExchanger.exchange(new Object());
+
+                                return null;
+                            }
+                        }
+                    );
+                cache.start();
+                future.get();
+
+                Assert.assertTrue(timing.awaitLatch(addedLatch));
+                Assert.assertNotNull(cache.get("/test/test"));
+                Assert.assertNull(cache.get(deletedPath.get()));
+                Assert.assertEquals(cache.get("/test/snafu").getData(), 
"grilled".getBytes());
+            }
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    // see https://github.com/Netflix/curator/issues/27 - was caused by not 
comparing old->new data
+    @Test
+    public void testIssue27() throws Exception
+    {
+        CuratorCache cache = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/base");
+            client.create().forPath("/base/a");
+            client.create().forPath("/base/b");
+            client.create().forPath("/base/c");
+
+            client.getChildren().forPath("/base");
+
+            final List<CacheEvent> events = Lists.newArrayList();
+            final Semaphore semaphore = new Semaphore(0);
+            cache = CuratorCacheBuilder.builder(client, 
"/base").forSingleLevel().sendingRefreshEvents(false).build();
+            cache.getListenable().addListener(new CacheListener()
+            {
+                @Override
+                public void process(CacheEvent event, String path)
+                {
+                    events.add(event);
+                    semaphore.release();
+                }
+            });
+            cache.start();
+
+            Assert.assertTrue(timing.acquireSemaphore(semaphore, 3));
+
+            client.delete().forPath("/base/a");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+            client.create().forPath("/base/a");
+            Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+            List<CacheEvent> expected = Lists.newArrayList
+                (
+                    CacheEvent.NODE_CREATED,
+                    CacheEvent.NODE_CREATED,
+                    CacheEvent.NODE_CREATED,
+                    CacheEvent.NODE_DELETED,
+                    CacheEvent.NODE_CREATED
+                );
+            Assert.assertEquals(expected, events);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    // test Issue 27 using new rebuild() method
+    @Test
+    public void testIssue27Alt() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/base");
+            client.create().forPath("/base/a");
+            client.create().forPath("/base/b");
+            client.create().forPath("/base/c");
+
+            client.getChildren().forPath("/base");
+
+            final List<CacheEvent> events = Lists.newArrayList();
+            final Semaphore semaphore = new Semaphore(0);
+            try ( final CuratorCache cache = 
CuratorCacheBuilder.builder(client, 
"/base").forSingleLevel().sendingRefreshEvents(false).build() )
+            {
+                cache.getListenable().addListener(new CacheListener()
+                {
+                    @Override
+                    public void process(CacheEvent event, String path)
+                    {
+                        if ( cache.refreshCount() > 0 )
+                        {
+                            events.add(event);
+                            semaphore.release();
+                        }
+                    }
+                });
+                Assert.assertTrue(timing.awaitLatch(cache.start()));
+
+                client.delete().forPath("/base/a");
+                Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+                client.create().forPath("/base/a");
+                Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
+
+                List<CacheEvent> expected = 
Lists.newArrayList(CacheEvent.NODE_DELETED, CacheEvent.NODE_CREATED);
+                Assert.assertEquals(expected, events);
+            }
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    //@Test
+    public void testKilledSession() throws Exception
+    {
+        PathChildrenCache cache = null;
+        CuratorFramework client = null;
+        try
+        {
+            client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+            client.start();
+            client.create().forPath("/test");
+
+            cache = new PathChildrenCache(client, "/test", true);
+            cache.start();
+
+            final CountDownLatch childAddedLatch = new CountDownLatch(1);
+            final CountDownLatch lostLatch = new CountDownLatch(1);
+            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            final CountDownLatch removedLatch = new CountDownLatch(1);
+            cache.getListenable().addListener
+                (
+                    new PathChildrenCacheListener()
+                    {
+                        @Override
+                        public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event) throws Exception
+                        {
+                            if ( event.getType() == 
PathChildrenCacheEvent.Type.CHILD_ADDED )
+                            {
+                                childAddedLatch.countDown();
+                            }
+                            else if ( event.getType() == 
PathChildrenCacheEvent.Type.CONNECTION_LOST )
+                            {
+                                lostLatch.countDown();
+                            }
+                            else if ( event.getType() == 
PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED )
+                            {
+                                reconnectedLatch.countDown();
+                            }
+                            else if ( event.getType() == 
PathChildrenCacheEvent.Type.CHILD_REMOVED )
+                            {
+                                removedLatch.countDown();
+                            }
+                        }
+                    }
+                );
+
+            client.create().withMode(CreateMode.EPHEMERAL).forPath("/test/me", 
"data".getBytes());
+            Assert.assertTrue(timing.awaitLatch(childAddedLatch));
+
+            KillSession.kill(client.getZookeeperClient().getZooKeeper());
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+            Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+            Assert.assertTrue(timing.awaitLatch(removedLatch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    //@Test
+    public void testModes() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            for ( boolean cacheData : new boolean[]{false, true} )
+            {
+                internalTestMode(client, cacheData);
+
+                client.delete().forPath("/test/one");
+                client.delete().forPath("/test/two");
+            }
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    //@Test
+    public void testRebuildNode() throws Exception
+    {
+        PathChildrenCache cache = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/test/one", 
"one".getBytes());
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            final AtomicInteger counter = new AtomicInteger();
+            final Semaphore semaphore = new Semaphore(1);
+            cache = new PathChildrenCache(client, "/test", true)
+            {
+                //@Override
+                void getDataAndStat(String fullPath) throws Exception
+                {
+                    semaphore.acquire();
+                    counter.incrementAndGet();
+                    //super.getDataAndStat(fullPath);
+                    latch.countDown();
+                }
+            };
+            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+
+            int saveCounter = counter.get();
+            client.setData().forPath("/test/one", "alt".getBytes());
+            cache.rebuildNode("/test/one");
+            Assert.assertEquals(cache.getCurrentData("/test/one").getData(), 
"alt".getBytes());
+            Assert.assertEquals(saveCounter, counter.get());
+
+            semaphore.release(1000);
+            timing.sleepABit();
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    private void internalTestMode(CuratorFramework client, boolean cacheData) 
throws Exception
+    {
+        try ( PathChildrenCache cache = new PathChildrenCache(client, "/test", 
cacheData) )
+        {
+            final CountDownLatch latch = new CountDownLatch(2);
+            cache.getListenable().addListener
+                (
+                    new PathChildrenCacheListener()
+                    {
+                        @Override
+                        public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event) throws Exception
+                        {
+                            if ( event.getType() == 
PathChildrenCacheEvent.Type.CHILD_ADDED )
+                            {
+                                latch.countDown();
+                            }
+                        }
+                    }
+                );
+            cache.start();
+
+            client.create().forPath("/test/one", "one".getBytes());
+            client.create().forPath("/test/two", "two".getBytes());
+            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+            for ( ChildData data : cache.getCurrentData() )
+            {
+                if ( cacheData )
+                {
+                    Assert.assertNotNull(data.getData());
+                    Assert.assertNotNull(data.getStat());
+                }
+                else
+                {
+                    Assert.assertNull(data.getData());
+                    Assert.assertNotNull(data.getStat());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testBasics() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            final BlockingQueue<CacheEvent> events = new 
LinkedBlockingQueue<>();
+            try ( CuratorCache cache = CuratorCacheBuilder.builder(client, 
"/test").forSingleLevel().build() )
+            {
+                cache.getListenable().addListener
+                (
+                    new CacheListener()
+                    {
+                        @Override
+                        public void process(CacheEvent event, String path)
+                        {
+                            if ( path.equals("/test/one") )
+                            {
+                                events.offer(event);
+                            }
+                        }
+                    }
+                );
+                cache.start();
+
+                client.create().forPath("/test/one", "hey there".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS), CacheEvent.NODE_CREATED);
+
+                client.setData().forPath("/test/one", "sup!".getBytes());
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS), CacheEvent.NODE_CHANGED);
+                Assert.assertTrue(cache.exists("/test/one"));
+                Assert.assertEquals(new 
String(cache.get("/test/one").getData()), "sup!");
+
+                client.delete().forPath("/test/one");
+                Assert.assertEquals(events.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS), CacheEvent.NODE_DELETED);
+            }
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    //@Test
+    public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events = new 
LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+            final ExecutorService exec = Executors.newSingleThreadExecutor();
+            try ( PathChildrenCache cache = new PathChildrenCache(client, 
"/test", true, false, exec) )
+            {
+                cache.getListenable().addListener
+                    (
+                        new PathChildrenCacheListener()
+                        {
+                            @Override
+                            public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event) throws Exception
+                            {
+                                if ( 
event.getData().getPath().equals("/test/one") )
+                                {
+                                    events.offer(event.getType());
+                                }
+                            }
+                        }
+                    );
+                cache.start();
+
+                final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new 
LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+                try ( PathChildrenCache cache2 = new PathChildrenCache(client, 
"/test", true, false, exec) )
+                {
+                    cache2.getListenable().addListener(
+                        new PathChildrenCacheListener()
+                        {
+                            @Override
+                            public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event)
+                                throws Exception
+                            {
+                                if ( 
event.getData().getPath().equals("/test/one") )
+                                {
+                                    events2.offer(event.getType());
+                                }
+                            }
+                        }
+                                                      );
+                    cache2.start();
+
+                    client.create().forPath("/test/one", "hey 
there".getBytes());
+                    
Assert.assertEquals(events.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+                    
Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+                    client.setData().forPath("/test/one", "sup!".getBytes());
+                    
Assert.assertEquals(events.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                    
Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);
+                    Assert.assertEquals(new 
String(cache.getCurrentData("/test/one").getData()), "sup!");
+                    Assert.assertEquals(new 
String(cache2.getCurrentData("/test/one").getData()), "sup!");
+
+                    client.delete().forPath("/test/one");
+                    
Assert.assertEquals(events.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                    
Assert.assertEquals(events2.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS), PathChildrenCacheEvent.Type.CHILD_REMOVED);
+                }
+            }
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+
+    //@Test
+    public void testDeleteNodeAfterCloseDoesntCallExecutor()
+        throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            final ExecuteCalledWatchingExecutorService exec = new 
ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
+            try ( PathChildrenCache cache = new PathChildrenCache(client, 
"/test", true, false, exec) )
+            {
+                cache.start();
+                client.create().forPath("/test/one", "hey there".getBytes());
+
+                cache.rebuild();
+                Assert.assertEquals(new 
String(cache.getCurrentData("/test/one").getData()), "hey there");
+                Assert.assertTrue(exec.isExecuteCalled());
+
+                exec.setExecuteCalled(false);
+            }
+            Assert.assertFalse(exec.isExecuteCalled());
+
+            client.delete().forPath("/test/one");
+            timing.sleepABit();
+            Assert.assertFalse(exec.isExecuteCalled());
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+
+    }
+
+    /**
+     * Tests the case where there's an outstanding operation being executed 
when the cache is
+     * shut down. See CURATOR-121, this was causing misleading warning 
messages to be logged.
+     */
+    //@Test
+    public void testInterruptedOperationOnShutdown() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new 
RetryOneTime(1));
+        client.start();
+
+        try
+        {
+            final CountDownLatch latch = new CountDownLatch(1);
+            try ( final PathChildrenCache cache = new 
PathChildrenCache(client, "/test", false) {
+                @Override
+                protected void handleException(Throwable e)
+                {
+                    latch.countDown();
+                }
+            } )
+            {
+                cache.start();
+
+/*
+                cache.offerOperation(new Operation()
+                {
+
+                    @Override
+                    public void invoke() throws Exception
+                    {
+                        Thread.sleep(5000);
+                    }
+                });
+*/
+
+                Thread.sleep(1000);
+
+            }
+
+            latch.await(5, TimeUnit.SECONDS);
+
+            Assert.assertTrue(latch.getCount() == 1, "Unexpected exception 
occurred");
+        }
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
+        }
+    }
+}

Reply via email to