Introduced CloseableExecutorService. Instead of blindly shutting down exectors, 
this container
shuts down any futures created by an executor. This resolves issues where 
custom executors
are given to Curator.

Merge branch 'CURATOR-17' into 2.0.1-incubating

Conflicts:
        
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/6e3c9e27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/6e3c9e27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/6e3c9e27

Branch: refs/heads/master
Commit: 6e3c9e27b8e6e2cda6ad146b8a63a60346badb0d
Parents: 11ae23a 601bc4c
Author: randgalt <randg...@apache.org>
Authored: Fri May 10 19:11:23 2013 -0700
Committer: randgalt <randg...@apache.org>
Committed: Fri May 10 19:11:23 2013 -0700

----------------------------------------------------------------------
 .../curator/utils/CloseableExecutorService.java    |  123 +++++++
 .../utils/CloseableScheduledExecutorService.java   |   72 ++++
 .../utils/TestCloseableExecutorService.java        |  252 +++++++++++++++
 .../framework/recipes/cache/PathChildrenCache.java |    9 +-
 .../framework/recipes/locks/ChildReaper.java       |    9 +-
 .../curator/framework/recipes/locks/Reaper.java    |   25 +-
 .../recipes/cache/TestPathChildrenCache.java       |   27 +--
 .../framework/recipes/locks/TestReaper.java        |   55 ++--
 8 files changed, 502 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6e3c9e27/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --cc 
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 76efc6c,61c3af7..ec2d328
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@@ -350,11 -355,10 +351,11 @@@ public class PathChildrenCache implemen
      @Override
      public void close() throws IOException
      {
 -        Preconditions.checkState(!executorService.isShutdown(), "has not been 
started");
 -
 -        
client.getConnectionStateListenable().removeListener(connectionStateListener);
 -        executorService.close();
 +        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
 +        {
 +            
client.getConnectionStateListenable().removeListener(connectionStateListener);
-             executorService.shutdownNow();
++            executorService.close();
 +        }
      }
  
      /**
@@@ -751,24 -749,9 +752,24 @@@
          }
      }
  
 -    private void offerOperation(Operation operation)
 +    /**
 +     * Submits a runnable to the executor.
 +     * <p/>
 +     * This method is synchronized because it has to check state about 
whether this instance is still open.  Without this check
 +     * there is a race condition with the dataWatchers that get set.  Even 
after this object is closed() it can still be
 +     * called by those watchers, because the close() method cannot actually 
disable the watcher.
 +     * <p/>
 +     * The synchronization overhead should be minimal if non-existant as this 
is generally only called from the
 +     * ZK client thread and will only contend if close() is called in 
parallel with an update, and that's the exact state
 +     * we want to protect from.
 +     *
 +     * @param command The runnable to run
 +     */
 +    private synchronized void submitToExecutor(final Runnable command)
      {
 -        operations.remove(operation);   // avoids herding for refresh 
operations
 -        operations.offer(operation);
 +        if ( state.get() == State.STARTED )
 +        {
-             executorService.execute(command);
++            executorService.submit(command);
 +        }
      }
  }

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/6e3c9e27/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
----------------------------------------------------------------------
diff --cc 
curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 7fed5f8,e51125b..4b117fb
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@@ -730,253 -728,4 +729,229 @@@ public class TestPathChildrenCache exte
              client.close();
          }
      }
 +
 +    @Test
 +    public void testBasicsOnTwoCachesWithSameExecutor() throws Exception
 +    {
 +        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
 +        client.start();
 +        try
 +        {
 +            client.create().forPath("/test");
 +
 +            final BlockingQueue<PathChildrenCacheEvent.Type> events = new 
LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
-             final ExecutorService exec = new 
ShutdownNowIgnoringExecutorService(Executors.newSingleThreadExecutor());
++            final ExecutorService exec = Executors.newSingleThreadExecutor();
 +            PathChildrenCache cache = new PathChildrenCache(client, "/test", 
true, false, exec);
 +            cache.getListenable().addListener
 +                (
 +                    new PathChildrenCacheListener()
 +                    {
 +                        @Override
 +                        public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event) throws Exception
 +                        {
 +                            if ( 
event.getData().getPath().equals("/test/one") )
 +                            {
 +                                events.offer(event.getType());
 +                            }
 +                        }
 +                    }
 +                );
 +            cache.start();
 +
 +            final BlockingQueue<PathChildrenCacheEvent.Type> events2 = new 
LinkedBlockingQueue<PathChildrenCacheEvent.Type>();
 +            PathChildrenCache cache2 = new PathChildrenCache(client, "/test", 
true, false, exec);
 +            cache2.getListenable().addListener(
 +                    new PathChildrenCacheListener() {
 +                        @Override
 +                        public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event)
 +                                throws Exception
 +                        {
 +                            if ( 
event.getData().getPath().equals("/test/one") )
 +                            {
 +                                events2.offer(event.getType());
 +                            }
 +                        }
 +                    }
 +            );
 +            cache2.start();
 +
 +            client.create().forPath("/test/one", "hey there".getBytes());
 +            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), 
PathChildrenCacheEvent.Type.CHILD_ADDED);
 +            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), 
PathChildrenCacheEvent.Type.CHILD_ADDED);
 +
 +            client.setData().forPath("/test/one", "sup!".getBytes());
 +            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), 
PathChildrenCacheEvent.Type.CHILD_UPDATED);
 +            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), 
PathChildrenCacheEvent.Type.CHILD_UPDATED);
 +            Assert.assertEquals(new 
String(cache.getCurrentData("/test/one").getData()), "sup!");
 +            Assert.assertEquals(new 
String(cache2.getCurrentData("/test/one").getData()), "sup!");
 +
 +            client.delete().forPath("/test/one");
 +            Assert.assertEquals(events.poll(10, TimeUnit.SECONDS), 
PathChildrenCacheEvent.Type.CHILD_REMOVED);
 +            Assert.assertEquals(events2.poll(10, TimeUnit.SECONDS), 
PathChildrenCacheEvent.Type.CHILD_REMOVED);
 +
 +            cache.close();
 +            cache2.close();
 +        }
 +        finally
 +        {
 +            client.close();
 +        }
 +    }
 +
 +    @Test
 +    public void testDeleteNodeAfterCloseDoesntCallExecutor()
 +            throws Exception
 +    {
 +        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
 +        client.start();
 +        try
 +        {
 +            client.create().forPath("/test");
 +
 +            final ExecuteCalledWatchingExecutorService exec = new 
ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
 +            PathChildrenCache cache = new PathChildrenCache(client, "/test", 
true, false, exec);
 +
 +            cache.start();
 +            client.create().forPath("/test/one", "hey there".getBytes());
 +
 +            cache.rebuild();
 +            Assert.assertEquals(new 
String(cache.getCurrentData("/test/one").getData()), "hey there");
 +            Assert.assertTrue(exec.isExecuteCalled());
 +
 +            exec.setExecuteCalled(false);
 +            cache.close();
 +            Assert.assertFalse(exec.isExecuteCalled());
 +
 +            client.delete().forPath("/test/one");
 +            Thread.sleep(100);
 +            Assert.assertFalse(exec.isExecuteCalled());
 +        }
 +        finally {
 +            client.close();
 +        }
 +
 +    }
 +
 +    public static class ExecuteCalledWatchingExecutorService extends 
DelegatingExecutorService
 +    {
 +        boolean executeCalled = false;
 +
 +        public ExecuteCalledWatchingExecutorService(ExecutorService delegate)
 +        {
 +            super(delegate);
 +        }
 +
 +        @Override
 +        public synchronized void execute(Runnable command)
 +        {
 +            executeCalled = true;
 +            super.execute(command);
 +        }
 +
 +        public synchronized boolean isExecuteCalled()
 +        {
 +            return executeCalled;
 +        }
 +
 +        public synchronized void setExecuteCalled(boolean executeCalled)
 +        {
 +            this.executeCalled = executeCalled;
 +        }
 +    }
 +
-     /**
-      * This is required to work around 
https://issues.apache.org/jira/browse/CURATOR-17
-      */
-     public static class ShutdownNowIgnoringExecutorService extends 
DelegatingExecutorService
-     {
-         public ShutdownNowIgnoringExecutorService(ExecutorService delegate)
-         {
-             super(delegate);
-         }
- 
-         @Override
-         public void shutdown()
-         {
-             // ignore
-         }
- 
-         @Override
-         public List<Runnable> shutdownNow()
-         {
-             // ignore
-             return ImmutableList.of();
-         }
-     }
- 
 +    public static class DelegatingExecutorService implements ExecutorService
 +    {
 +        private final ExecutorService delegate;
 +
 +        public DelegatingExecutorService(
 +                ExecutorService delegate
 +        )
 +        {
 +            this.delegate = delegate;
 +        }
 +
 +
 +        @Override
 +        public void shutdown()
 +        {
 +            delegate.shutdown();
 +        }
 +
 +        @Override
 +        public List<Runnable> shutdownNow()
 +        {
 +            return delegate.shutdownNow();
 +        }
 +
 +        @Override
 +        public boolean isShutdown()
 +        {
 +            return delegate.isShutdown();
 +        }
 +
 +        @Override
 +        public boolean isTerminated()
 +        {
 +            return delegate.isTerminated();
 +        }
 +
 +        @Override
 +        public boolean awaitTermination(long timeout, TimeUnit unit)
 +                throws InterruptedException
 +        {
 +            return delegate.awaitTermination(timeout, unit);
 +        }
 +
 +        @Override
 +        public <T> Future<T> submit(Callable<T> task)
 +        {
 +            return delegate.submit(task);
 +        }
 +
 +        @Override
 +        public <T> Future<T> submit(Runnable task, T result)
 +        {
 +            return delegate.submit(task, result);
 +        }
 +
 +        @Override
 +        public Future<?> submit(Runnable task)
 +        {
 +            return delegate.submit(task);
 +        }
 +
 +        @Override
 +        public <T> List<Future<T>> invokeAll(Collection<? extends 
Callable<T>> tasks)
 +                throws InterruptedException
 +        {
 +            return delegate.invokeAll(tasks);
 +        }
 +
 +        @Override
 +        public <T> List<Future<T>> invokeAll(Collection<? extends 
Callable<T>> tasks, long timeout, TimeUnit unit)
 +                throws InterruptedException
 +        {
 +            return delegate.invokeAll(tasks, timeout, unit);
 +        }
 +
 +        @Override
 +        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 +                throws InterruptedException, ExecutionException
 +        {
 +            return delegate.invokeAny(tasks);
 +        }
 +
 +        @Override
 +        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit)
 +                throws InterruptedException, ExecutionException, 
TimeoutException
 +        {
 +            return delegate.invokeAny(tasks, timeout, unit);
 +        }
 +
 +        @Override
 +        public void execute(Runnable command)
 +        {
 +            delegate.execute(command);
 +        }
 +    }
  }

Reply via email to