Updated Branches:
  refs/heads/master 9eff1cf0e -> 6ac0133ce

CURATOR-21
PathChildrenCache currently takes an Executor, but only to fire off a Runnable 
that does a blocking while loop waiting for work. This means that you must have 
one thread per PathChildrenCache, which is not that great.

PathChildrenCache should just use the Executor's work queuing mechanism to 
enqueue work items instead of maintaining its own work queue mechanism.


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/75b54041
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/75b54041
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/75b54041

Branch: refs/heads/master
Commit: 75b540414f860446f106a1cdbdea121046be2e98
Parents: 97cda39
Author: randgalt <randg...@apache.org>
Authored: Sun May 5 07:20:02 2013 -0700
Committer: randgalt <randg...@apache.org>
Committed: Sun May 5 07:20:02 2013 -0700

----------------------------------------------------------------------
 .../apache/curator/framework/CuratorFramework.java |    3 +
 .../framework/recipes/cache/PathChildrenCache.java |  132 +++++----
 .../recipes/cache/PathChildrenCacheEvent.java      |   21 ++
 .../recipes/cache/TestPathChildrenCache.java       |  251 +++++++++++++++
 4 files changed, 350 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/75b54041/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 7938b2e..0ca9b23 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -74,6 +74,9 @@ public interface CuratorFramework extends Closeable
     /**
      * Start an exists builder
      *
+     * The builder will return a Stat object as if 
org.apache.zookeeper.ZooKeeper.exists() were called.  Thus, a null
+     * means that it does not exist and an actual Stat object means it does 
exist.
+     *
      * @return builder object
      */
     public ExistsBuilder checkExists();

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/75b54041/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 9b25001..76efc6c 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.cache;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -46,12 +47,10 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Exchanger;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -74,12 +73,20 @@ public class PathChildrenCache implements Closeable
     private final boolean cacheData;
     private final boolean dataIsCompressed;
     private final EnsurePath ensurePath;
-    private final BlockingQueue<Operation> operations = new 
LinkedBlockingQueue<Operation>();
     private final ListenerContainer<PathChildrenCacheListener> listeners = new 
ListenerContainer<PathChildrenCacheListener>();
     private final ConcurrentMap<String, ChildData> currentData = 
Maps.newConcurrentMap();
     private final AtomicReference<Map<String, ChildData>> initialSet = new 
AtomicReference<Map<String, ChildData>>();
+    private final Set<Operation> operationsQuantizer = 
Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
+    private final AtomicReference<State> state = new 
AtomicReference<State>(State.LATENT);
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
 
-    private static final ChildData      NULL_CHILD_DATA = new ChildData(null, 
null, null);
+    private static final ChildData NULL_CHILD_DATA = new ChildData(null, null, 
null);
 
     private final Watcher childrenWatcher = new Watcher()
     {
@@ -217,8 +224,8 @@ public class PathChildrenCache implements Closeable
      * @param buildInitial if true, {@link #rebuild()} will be called before 
this method
      *                     returns in order to get an initial view of the 
node; otherwise,
      *                     the cache will be initialized asynchronously
-     * @deprecated use {@link #start(StartMode)}
      * @throws Exception errors
+     * @deprecated use {@link #start(StartMode)}
      */
     public void start(boolean buildInitial) throws Exception
     {
@@ -257,21 +264,10 @@ public class PathChildrenCache implements Closeable
      */
     public void start(StartMode mode) throws Exception
     {
-        Preconditions.checkState(!executorService.isShutdown(), "already 
started");
+        Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "already started");
         mode = Preconditions.checkNotNull(mode, "mode cannot be null");
 
         
client.getConnectionStateListenable().addListener(connectionStateListener);
-        executorService.execute
-            (
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        mainLoop();
-                    }
-                }
-            );
 
         switch ( mode )
         {
@@ -354,10 +350,11 @@ public class PathChildrenCache implements Closeable
     @Override
     public void close() throws IOException
     {
-        Preconditions.checkState(!executorService.isShutdown(), "has not been 
started");
-
-        
client.getConnectionStateListenable().removeListener(connectionStateListener);
-        executorService.shutdownNow();
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            
client.getConnectionStateListenable().removeListener(connectionStateListener);
+            executorService.shutdownNow();
+        }
     }
 
     /**
@@ -623,17 +620,17 @@ public class PathChildrenCache implements Closeable
     private void processChildren(List<String> children, RefreshMode mode) 
throws Exception
     {
         List<String> fullPaths = Lists.newArrayList(Lists.transform
-        (
-            children,
-            new Function<String, String>()
-            {
-                @Override
-                public String apply(String child)
+            (
+                children,
+                new Function<String, String>()
                 {
-                    return ZKPaths.makePath(path, child);
+                    @Override
+                    public String apply(String child)
+                    {
+                        return ZKPaths.makePath(path, child);
+                    }
                 }
-            }
-        ));
+            ));
         Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
         removedNodes.removeAll(fullPaths);
 
@@ -714,43 +711,64 @@ public class PathChildrenCache implements Closeable
         }
 
         Map<String, ChildData> uninitializedChildren = Maps.filterValues
-        (
-            localInitialSet,
-            new Predicate<ChildData>()
-            {
-                @Override
-                public boolean apply(ChildData input)
+            (
+                localInitialSet,
+                new Predicate<ChildData>()
                 {
-                    return (input == NULL_CHILD_DATA);  // check against ref 
intentional
+                    @Override
+                    public boolean apply(ChildData input)
+                    {
+                        return (input == NULL_CHILD_DATA);  // check against 
ref intentional
+                    }
                 }
-            }
-        );
+            );
         return (uninitializedChildren.size() != 0);
     }
 
-    private void mainLoop()
+    private void offerOperation(final Operation operation)
     {
-        while ( !Thread.currentThread().isInterrupted() )
+        if ( operationsQuantizer.add(operation) )
         {
-            try
-            {
-                operations.take().invoke();
-            }
-            catch ( InterruptedException e )
-            {
-                Thread.currentThread().interrupt();
-                break;
-            }
-            catch ( Exception e )
-            {
-                handleException(e);
-            }
+            submitToExecutor
+            (
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            operationsQuantizer.remove(operation);
+                            operation.invoke();
+                        }
+                        catch ( Exception e )
+                        {
+                            handleException(e);
+                        }
+                    }
+                }
+            );
         }
     }
 
-    private void offerOperation(Operation operation)
+    /**
+     * Submits a runnable to the executor.
+     * <p/>
+     * This method is synchronized because it has to check state about whether 
this instance is still open.  Without this check
+     * there is a race condition with the dataWatchers that get set.  Even 
after this object is closed() it can still be
+     * called by those watchers, because the close() method cannot actually 
disable the watcher.
+     * <p/>
+     * The synchronization overhead should be minimal if non-existant as this 
is generally only called from the
+     * ZK client thread and will only contend if close() is called in parallel 
with an update, and that's the exact state
+     * we want to protect from.
+     *
+     * @param command The runnable to run
+     */
+    private synchronized void submitToExecutor(final Runnable command)
     {
-        operations.remove(operation);   // avoids herding for refresh 
operations
-        operations.offer(operation);
+        if ( state.get() == State.STARTED )
+        {
+            executorService.execute(command);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/75b54041/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
index e85d509..f1c10f8 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheEvent.java
@@ -51,16 +51,37 @@ public class PathChildrenCacheEvent
 
         /**
          * Called when the connection has changed to {@link 
ConnectionState#SUSPENDED}
+         *
+         * This is exposed so that users of the class can be notified of 
issues that *might* affect normal operation.
+         * The PathChildrenCache is written such that listeners are not 
expected to do anything special on this
+         * event, except for those people who want to cause some 
application-specific logic to fire when this occurs.
+         * While the connection is down, the PathChildrenCache will continue 
to have its state from before it lost
+         * the connection and after the connection is restored, the 
PathChildrenCache will emit normal child events
+         * for all of the adds, deletes and updates that happened during the 
time that it was disconnected.
          */
         CONNECTION_SUSPENDED,
 
         /**
          * Called when the connection has changed to {@link 
ConnectionState#RECONNECTED}
+         *
+         * This is exposed so that users of the class can be notified of 
issues that *might* affect normal operation.
+         * The PathChildrenCache is written such that listeners are not 
expected to do anything special on this
+         * event, except for those people who want to cause some 
application-specific logic to fire when this occurs.
+         * While the connection is down, the PathChildrenCache will continue 
to have its state from before it lost
+         * the connection and after the connection is restored, the 
PathChildrenCache will emit normal child events
+         * for all of the adds, deletes and updates that happened during the 
time that it was disconnected.
          */
         CONNECTION_RECONNECTED,
 
         /**
          * Called when the connection has changed to {@link 
ConnectionState#LOST}
+         *
+         * This is exposed so that users of the class can be notified of 
issues that *might* affect normal operation.
+         * The PathChildrenCache is written such that listeners are not 
expected to do anything special on this
+         * event, except for those people who want to cause some 
application-specific logic to fire when this occurs.
+         * While the connection is down, the PathChildrenCache will continue 
to have its state from before it lost
+         * the connection and after the connection is restored, the 
PathChildrenCache will emit normal child events
+         * for all of the adds, deletes and updates that happened during the 
time that it was disconnected.
          */
         CONNECTION_LOST,
 

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/75b54041/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index e51125b..7fed5f8 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.recipes.cache;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
 import org.apache.curator.framework.CuratorFramework;
@@ -31,6 +32,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -728,4 +730,253 @@ public class TestPathChildrenCache extends 
BaseClassForTests
             client.close();
         }
     }
+
+    @Test
+    public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events = new 
LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+            final ExecutorService exec = new 
ShutdownNowIgnoringExecutorService(Executors.newSingleThreadExecutor());
+            PathChildrenCache cache = new PathChildrenCache(client, "/test", 
true, false, exec);
+            cache.getListenable().addListener
+                (
+                    new PathChildrenCacheListener()
+                    {
+                        @Override
+                        public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event) throws Exception
+                        {
+                            if ( event.getData().getPath().equals("/test/one") 
)
+                            {
+                                events.offer(event.getType());
+                            }
+                        }
+                    }
+                );
+            cache.start();
+
+            final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new 
LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
+            PathChildrenCache cache2 = new PathChildrenCache(client, "/test", 
true, false, exec);
+            cache2.getListenable().addListener(
+                    new PathChildrenCacheListener() {
+                        @Override
+                        public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event)
+                                throws Exception
+                        {
+                            if ( event.getData().getPath().equals("/test/one") 
)
+                            {
+                                events2.offer(event.getType());
+                            }
+                        }
+                    }
+            );
+            cache2.start();
+
+            client.create().forPath("/test/one", "hey there".getBytes());
+            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), 
PathChildrenCacheEvent.Type.CHILD_ADDED);
+            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), 
PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+            client.setData().forPath("/test/one", "sup!".getBytes());
+            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), 
PathChildrenCacheEvent.Type.CHILD_UPDATED);
+            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), 
PathChildrenCacheEvent.Type.CHILD_UPDATED);
+            Assert.assertEquals(new 
String(cache.getCurrentData("/test/one").getData()), "sup!");
+            Assert.assertEquals(new 
String(cache2.getCurrentData("/test/one").getData()), "sup!");
+
+            client.delete().forPath("/test/one");
+            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), 
PathChildrenCacheEvent.Type.CHILD_REMOVED);
+            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), 
PathChildrenCacheEvent.Type.CHILD_REMOVED);
+
+            cache.close();
+            cache2.close();
+        }
+        finally
+        {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testDeleteNodeAfterCloseDoesntCallExecutor()
+            throws Exception
+    {
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        client.start();
+        try
+        {
+            client.create().forPath("/test");
+
+            final ExecuteCalledWatchingExecutorService exec = new 
ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
+            PathChildrenCache cache = new PathChildrenCache(client, "/test", 
true, false, exec);
+
+            cache.start();
+            client.create().forPath("/test/one", "hey there".getBytes());
+
+            cache.rebuild();
+            Assert.assertEquals(new 
String(cache.getCurrentData("/test/one").getData()), "hey there");
+            Assert.assertTrue(exec.isExecuteCalled());
+
+            exec.setExecuteCalled(false);
+            cache.close();
+            Assert.assertFalse(exec.isExecuteCalled());
+
+            client.delete().forPath("/test/one");
+            Thread.sleep(100);
+            Assert.assertFalse(exec.isExecuteCalled());
+        }
+        finally {
+            client.close();
+        }
+
+    }
+
+    public static class ExecuteCalledWatchingExecutorService extends 
DelegatingExecutorService
+    {
+        boolean executeCalled = false;
+
+        public ExecuteCalledWatchingExecutorService(ExecutorService delegate)
+        {
+            super(delegate);
+        }
+
+        @Override
+        public synchronized void execute(Runnable command)
+        {
+            executeCalled = true;
+            super.execute(command);
+        }
+
+        public synchronized boolean isExecuteCalled()
+        {
+            return executeCalled;
+        }
+
+        public synchronized void setExecuteCalled(boolean executeCalled)
+        {
+            this.executeCalled = executeCalled;
+        }
+    }
+
+    /**
+     * This is required to work around 
https://issues.apache.org/jira/browse/CURATOR-17
+     */
+    public static class ShutdownNowIgnoringExecutorService extends 
DelegatingExecutorService
+    {
+        public ShutdownNowIgnoringExecutorService(ExecutorService delegate)
+        {
+            super(delegate);
+        }
+
+        @Override
+        public void shutdown()
+        {
+            // ignore
+        }
+
+        @Override
+        public List<Runnable> shutdownNow()
+        {
+            // ignore
+            return ImmutableList.of();
+        }
+    }
+
+    public static class DelegatingExecutorService implements ExecutorService
+    {
+        private final ExecutorService delegate;
+
+        public DelegatingExecutorService(
+                ExecutorService delegate
+        )
+        {
+            this.delegate = delegate;
+        }
+
+
+        @Override
+        public void shutdown()
+        {
+            delegate.shutdown();
+        }
+
+        @Override
+        public List<Runnable> shutdownNow()
+        {
+            return delegate.shutdownNow();
+        }
+
+        @Override
+        public boolean isShutdown()
+        {
+            return delegate.isShutdown();
+        }
+
+        @Override
+        public boolean isTerminated()
+        {
+            return delegate.isTerminated();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit)
+                throws InterruptedException
+        {
+            return delegate.awaitTermination(timeout, unit);
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task)
+        {
+            return delegate.submit(task);
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result)
+        {
+            return delegate.submit(task, result);
+        }
+
+        @Override
+        public Future<?> submit(Runnable task)
+        {
+            return delegate.submit(task);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks)
+                throws InterruptedException
+        {
+            return delegate.invokeAll(tasks);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
tasks, long timeout, TimeUnit unit)
+                throws InterruptedException
+        {
+            return delegate.invokeAll(tasks, timeout, unit);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+                throws InterruptedException, ExecutionException
+        {
+            return delegate.invokeAny(tasks);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException, 
TimeoutException
+        {
+            return delegate.invokeAny(tasks, timeout, unit);
+        }
+
+        @Override
+        public void execute(Runnable command)
+        {
+            delegate.execute(command);
+        }
+    }
 }

Reply via email to