CURATOR-17 PathChildrenCache.close() calls shutdownNow() on its executor, always. Instead, it Curator (in general) should only close tasks it has started. This is now done via wrapped executors in a new class that tracks tasks started by Curator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/10df9fc2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/10df9fc2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/10df9fc2 Branch: refs/heads/master Commit: 10df9fc24a2c36eac9812af1cc4c10c0bab1ae9b Parents: 86b82ab Author: randgalt <randg...@apache.org> Authored: Mon May 6 13:11:49 2013 -0700 Committer: randgalt <randg...@apache.org> Committed: Mon May 6 13:11:49 2013 -0700 ---------------------------------------------------------------------- .../curator/utils/CloseableExecutorService.java | 111 +++++++++- .../utils/CloseableExecutorServiceBase.java | 124 ----------- .../utils/CloseableScheduledExecutorService.java | 104 ++++------ .../org/apache/curator/utils/FutureContainer.java | 91 -------- .../utils/TestCloseableExecutorService.java | 160 ++++++--------- .../framework/recipes/cache/PathChildrenCache.java | 16 +- .../framework/recipes/locks/ChildReaper.java | 9 +- .../curator/framework/recipes/locks/Reaper.java | 11 +- .../framework/recipes/locks/TestReaper.java | 55 +++--- 9 files changed, 258 insertions(+), 423 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java index cf92ef4..4024d29 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java +++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorService.java @@ -1,28 +1,121 @@ package org.apache.curator.utils; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.io.Closeable; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; /** - * Decorates an {@link ExecutorService} such that submitted tasks - * are recorded and can be closed en masse. + * Decoration on an ExecutorService that tracks created futures and provides + * a method to close futures created via this class */ -public class CloseableExecutorService extends CloseableExecutorServiceBase +public class CloseableExecutorService implements Closeable { - private final ListeningExecutorService executorService; + private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap()); + private final ExecutorService executorService; + protected final AtomicBoolean isOpen = new AtomicBoolean(true); + + protected class InternalFutureTask<T> extends FutureTask<T> + { + private final RunnableFuture<T> task; + + InternalFutureTask(RunnableFuture<T> task) + { + super(task, null); + this.task = task; + futures.add(task); + } + + protected void done() + { + futures.remove(task); + } + } /** * @param executorService the service to decorate */ public CloseableExecutorService(ExecutorService executorService) { - this.executorService = MoreExecutors.listeningDecorator(executorService); + this.executorService = executorService; + } + + /** + * Returns <tt>true</tt> if this executor has been shut down. + * + * @return <tt>true</tt> if this executor has been shut down + */ + public boolean isShutdown() + { + return !isOpen.get(); } + @VisibleForTesting + int size() + { + return futures.size(); + } + + /** + * Closes any tasks currently in progress + */ @Override - protected ListeningExecutorService getService() + public void close() { - return executorService; + isOpen.set(false); + Iterator<Future<?>> iterator = futures.iterator(); + while ( iterator.hasNext() ) + { + Future<?> future = iterator.next(); + iterator.remove(); + if ( !future.cancel(true) ) + { + System.err.println("Could not cancel"); + throw new RuntimeException("Could not cancel"); + } + } + } + + /** + * Submits a value-returning task for execution and returns a Future + * representing the pending results of the task. Upon completion, + * this task may be taken or polled. + * + * @param task the task to submit + * @return a future to watch the task + */ + public<V> Future<V> submit(Callable<V> task) + { + Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed"); + + InternalFutureTask<V> futureTask = new InternalFutureTask<V>(new FutureTask<V>(task)); + executorService.execute(futureTask); + return futureTask; + } + + /** + * Submits a Runnable task for execution and returns a Future + * representing that task. Upon completion, this task may be + * taken or polled. + * + * @param task the task to submit + * @return a future to watch the task + */ + public Future<?> submit(Runnable task) + { + Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed"); + + InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null)); + executorService.execute(futureTask); + return futureTask; } } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java deleted file mode 100644 index 92371d7..0000000 --- a/curator-client/src/main/java/org/apache/curator/utils/CloseableExecutorServiceBase.java +++ /dev/null @@ -1,124 +0,0 @@ -package org.apache.curator.utils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import java.io.Closeable; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Decorates an {@link ExecutorService} such that submitted tasks - * are recorded and can be closed en masse. - */ -abstract class CloseableExecutorServiceBase implements Closeable -{ - private final Set<Future<?>> futures = Sets.newSetFromMap(Maps.<Future<?>, Boolean>newConcurrentMap()); - private final AtomicBoolean isClosed = new AtomicBoolean(false); - - protected abstract ListeningExecutorService getService(); - - @Override - public void close() - { - isClosed.set(true); - Iterator<Future<?>> iterator = futures.iterator(); - while ( iterator.hasNext() ) - { - Future<?> future = iterator.next(); - iterator.remove(); - future.cancel(true); - } - } - - /** - * @see ExecutorService#isShutdown() - * @return true/false - */ - public boolean isShutdown() - { - return getService().isShutdown(); - } - - /** - * @see ExecutorService#isTerminated() - * @return true/false - */ - public boolean isTerminated() - { - return getService().isTerminated(); - } - - /** - * Calls {@link ExecutorService#submit(Callable)}, records - * and returns the future - * - * @param task task to submit - * @return the future - */ - public <T> Future<T> submit(Callable<T> task) - { - return record(getService().submit(task)); - } - - /** - * Calls {@link ExecutorService#submit(Runnable)}, records - * and returns the future - * - * @param task task to submit - * @return the future - */ - public Future<?> submit(Runnable task) - { - return record(getService().submit(task)); - } - - @VisibleForTesting - int size() - { - return futures.size(); - } - - protected <T> ScheduledFuture<T> record(final ScheduledFuture<T> future) - { - if ( isClosed.get() ) - { - future.cancel(true); - } - else - { - futures.add(future); - } - return future; - } - - protected <T> Future<T> record(final ListenableFuture<T> future) - { - Runnable listener = new Runnable() - { - @Override - public void run() - { - futures.remove(future); - } - }; - if ( isClosed.get() ) - { - future.cancel(true); - } - else - { - futures.add(future); - future.addListener(listener, MoreExecutors.sameThreadExecutor()); - } - return future; - } -} http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java index 8638ee6..737ff6b 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java +++ b/curator-client/src/main/java/org/apache/curator/utils/CloseableScheduledExecutorService.java @@ -1,100 +1,72 @@ package org.apache.curator.utils; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.ListeningScheduledExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; +import com.google.common.base.Preconditions; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** - * Decorates an {@link ExecutorService} such that submitted tasks - * are recorded and can be closed en masse. + * Decoration on an ScheduledExecutorService that tracks created futures and provides + * a method to close futures created via this class */ -public class CloseableScheduledExecutorService extends CloseableExecutorServiceBase +public class CloseableScheduledExecutorService extends CloseableExecutorService { - private final ListeningScheduledExecutorService executorService; + private final ScheduledExecutorService scheduledExecutorService; /** - * @param executorService the service to decorate + * @param scheduledExecutorService the service to decorate */ - public CloseableScheduledExecutorService(ScheduledExecutorService executorService) + public CloseableScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { - this.executorService = MoreExecutors.listeningDecorator(executorService); - } - - @Override - protected ListeningExecutorService getService() - { - return executorService; + super(scheduledExecutorService); + this.scheduledExecutorService = scheduledExecutorService; } /** - * Calls {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)}, records - * and returns the future + * Creates and executes a one-shot action that becomes enabled + * after the given delay. * - * @param command the task to execute + * @param task the task to execute * @param delay the time from now to delay execution - * @param unit the time unit of the delay parameter - * @return a ScheduledFuture representing pending completion of + * @param unit the time unit of the delay parameter + * @return a Future representing pending completion of * the task and whose <tt>get()</tt> method will return * <tt>null</tt> upon completion */ - public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) + public Future<?> schedule(Runnable task, long delay, TimeUnit unit) { - return record(executorService.schedule(command, delay, unit)); - } + Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed"); - /** - * Calls {@link ScheduledExecutorService#schedule(Callable, long, TimeUnit)}, records - * and returns the future - * - * @param callable the task to execute - * @param delay the time from now to delay execution - * @param unit the time unit of the delay parameter - * @return a ScheduledFuture representing pending completion of - * the task and whose <tt>get()</tt> method will return - * <tt>null</tt> upon completion - */ - public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) - { - return record(executorService.schedule(callable, delay, unit)); + InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null)); + scheduledExecutorService.schedule(futureTask, delay, unit); + return futureTask; } /** - * Calls {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}, records - * and returns the future + * Creates and executes a periodic action that becomes enabled first + * after the given initial delay, and subsequently with the + * given delay between the termination of one execution and the + * commencement of the next. If any execution of the task + * encounters an exception, subsequent executions are suppressed. + * Otherwise, the task will only terminate via cancellation or + * termination of the executor. * - * @param command the task to execute + * @param task the task to execute * @param initialDelay the time to delay first execution - * @param period the period between successive executions - * @param unit the time unit of the initialDelay and period parameters - * @return a ScheduledFuture representing pending completion of + * @param delay the delay between the termination of one + * execution and the commencement of the next + * @param unit the time unit of the initialDelay and delay parameters + * @return a Future representing pending completion of * the task, and whose <tt>get()</tt> method will throw an * exception upon cancellation */ - public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) + public Future<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { - return record(executorService.scheduleAtFixedRate(command, initialDelay, period, unit)); - } + Preconditions.checkState(isOpen.get(), "CloseableExecutorService is closed"); - /** - * Calls {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}, records - * and returns the future - * - * @param command the task to execute - * @param initialDelay the time to delay first execution - * @param delay the delay between the termination of one - * execution and the commencement of the next - * @param unit the time unit of the initialDelay and delay parameters - * @return a ScheduledFuture representing pending completion of - * the task, and whose <tt>get()</tt> method will throw an - * exception upon cancellation - */ - public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) - { - return record(executorService.scheduleWithFixedDelay(command, initialDelay, delay, unit)); + InternalFutureTask<Void> futureTask = new InternalFutureTask<Void>(new FutureTask<Void>(task, null)); + scheduledExecutorService.scheduleWithFixedDelay(futureTask, initialDelay, delay, unit); + return futureTask; } } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java b/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java deleted file mode 100644 index 51fe6a4..0000000 --- a/curator-client/src/main/java/org/apache/curator/utils/FutureContainer.java +++ /dev/null @@ -1,91 +0,0 @@ -package org.apache.curator.utils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import java.io.Closeable; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.RunnableFuture; - -public class FutureContainer implements Closeable -{ - private final List<Future<?>> futures = Lists.newArrayList(); - private final ExecutorService executorService; - - private class QueueingFuture<T> extends FutureTask<T> - { - private final RunnableFuture<T> task; - - QueueingFuture(RunnableFuture<T> task) - { - super(task, null); - this.task = task; - futures.add(task); - } - - protected void done() - { - futures.remove(task); - } - } - - public FutureContainer(ExecutorService executorService) - { - this.executorService = executorService; - } - - @VisibleForTesting - int size() - { - return futures.size(); - } - - @Override - public void close() - { - Iterator<Future<?>> iterator = futures.iterator(); - while ( iterator.hasNext() ) - { - Future<?> future = iterator.next(); - iterator.remove(); - if ( !future.cancel(true) ) - { - System.err.println("Could not cancel"); - throw new RuntimeException("Could not cancel"); - } - } - } - - /** - * Submits a value-returning task for execution and returns a Future - * representing the pending results of the task. Upon completion, - * this task may be taken or polled. - * - * @param task the task to submit - */ - public<V> void submit(Callable<V> task) - { - FutureTask<V> futureTask = new FutureTask<V>(task); - executorService.execute(new QueueingFuture<V>(futureTask)); - } - - /** - * Submits a Runnable task for execution and returns a Future - * representing that task. Upon completion, this task may be - * taken or polled. - * - * @param task the task to submit - */ - public void submit(Runnable task) - { - FutureTask<Void> futureTask = new FutureTask<Void>(task, null); - executorService.execute(new QueueingFuture<Void>(futureTask)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java ---------------------------------------------------------------------- diff --git a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java index 2cd2901..72b63fd 100644 --- a/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java +++ b/curator-client/src/test/java/org/apache/curator/utils/TestCloseableExecutorService.java @@ -12,20 +12,17 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; public class TestCloseableExecutorService { private static final int QTY = 10; private volatile ExecutorService executorService; - private volatile AtomicInteger count; @BeforeMethod public void setup() { executorService = Executors.newFixedThreadPool(QTY * 2); - count = new AtomicInteger(0); } @AfterMethod @@ -39,7 +36,7 @@ public class TestCloseableExecutorService { try { - FutureContainer service = new FutureContainer(executorService); + CloseableExecutorService service = new CloseableExecutorService(executorService); CountDownLatch startLatch = new CountDownLatch(QTY); CountDownLatch latch = new CountDownLatch(QTY); for ( int i = 0; i < QTY; ++i ) @@ -47,9 +44,9 @@ public class TestCloseableExecutorService submitRunnable(service, startLatch, latch); } - Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size()); + Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS)); service.close(); - Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size()); + Assert.assertTrue(latch.await(3, TimeUnit.SECONDS)); } catch ( AssertionError e ) { @@ -65,41 +62,39 @@ public class TestCloseableExecutorService public void testBasicCallable() throws InterruptedException { CloseableExecutorService service = new CloseableExecutorService(executorService); - List<CountDownLatch> latches = Lists.newArrayList(); + final CountDownLatch startLatch = new CountDownLatch(QTY); + final CountDownLatch latch = new CountDownLatch(QTY); for ( int i = 0; i < QTY; ++i ) { - final CountDownLatch latch = new CountDownLatch(1); - latches.add(latch); service.submit - ( - new Callable<Void>() + ( + new Callable<Void>() + { + @Override + public Void call() throws Exception { - @Override - public Void call() throws Exception + try + { + startLatch.countDown(); + Thread.currentThread().join(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + } + finally { - try - { - Thread.currentThread().join(); - } - catch ( InterruptedException e ) - { - Thread.currentThread().interrupt(); - } - finally - { - latch.countDown(); - } - return null; + latch.countDown(); } + return null; } - ); + } + ); } + Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS)); service.close(); - for ( CountDownLatch latch : latches ) - { - Assert.assertTrue(latch.await(3, TimeUnit.SECONDS)); - } + Assert.assertTrue(latch.await(3, TimeUnit.SECONDS)); } @Test @@ -107,6 +102,7 @@ public class TestCloseableExecutorService { CloseableExecutorService service = new CloseableExecutorService(executorService); List<Future<?>> futures = Lists.newArrayList(); + final CountDownLatch startLatch = new CountDownLatch(QTY); for ( int i = 0; i < QTY; ++i ) { Future<?> future = service.submit @@ -118,6 +114,7 @@ public class TestCloseableExecutorService { try { + startLatch.countDown(); Thread.currentThread().join(); } catch ( InterruptedException e ) @@ -130,7 +127,7 @@ public class TestCloseableExecutorService futures.add(future); } - Thread.sleep(100); + Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS)); for ( Future<?> future : futures ) { @@ -144,6 +141,7 @@ public class TestCloseableExecutorService public void testListeningCallable() throws InterruptedException { CloseableExecutorService service = new CloseableExecutorService(executorService); + final CountDownLatch startLatch = new CountDownLatch(QTY); List<Future<?>> futures = Lists.newArrayList(); for ( int i = 0; i < QTY; ++i ) { @@ -156,6 +154,7 @@ public class TestCloseableExecutorService { try { + startLatch.countDown(); Thread.currentThread().join(); } catch ( InterruptedException e ) @@ -169,8 +168,7 @@ public class TestCloseableExecutorService futures.add(future); } - Thread.sleep(100); - + Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS)); for ( Future<?> future : futures ) { future.cancel(true); @@ -182,69 +180,52 @@ public class TestCloseableExecutorService @Test public void testPartialRunnable() throws InterruptedException { - try - { - final CountDownLatch outsideLatch = new CountDownLatch(1); - executorService.submit - ( - new Runnable() + final CountDownLatch outsideLatch = new CountDownLatch(1); + executorService.submit + ( + new Runnable() + { + @Override + public void run() { - @Override - public void run() + try { - try - { - Thread.currentThread().join(); - } - catch ( InterruptedException e ) - { - Thread.currentThread().interrupt(); - } - finally - { - outsideLatch.countDown(); - } + Thread.currentThread().join(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + } + finally + { + outsideLatch.countDown(); } } - ); - - FutureContainer service = new FutureContainer(executorService); - CountDownLatch startLatch = new CountDownLatch(QTY); - CountDownLatch latch = new CountDownLatch(QTY); - for ( int i = 0; i < QTY; ++i ) - { - submitRunnable(service, startLatch, latch); } + ); - while ( service.size() < QTY ) - { - Thread.sleep(100); - } - - Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size()); - service.close(); - Assert.assertTrue(latch.await(3, TimeUnit.SECONDS), "Latch = " + latch.getCount() + " Count = " + count.get() + " - Size = " + service.size()); - Assert.assertEquals(outsideLatch.getCount(), 1); - } - catch ( AssertionError e ) - { - throw e; - } - catch ( Throwable e ) + CloseableExecutorService service = new CloseableExecutorService(executorService); + CountDownLatch startLatch = new CountDownLatch(QTY); + CountDownLatch latch = new CountDownLatch(QTY); + for ( int i = 0; i < QTY; ++i ) { - e.printStackTrace(); + submitRunnable(service, startLatch, latch); } - finally + + while ( service.size() < QTY ) { - executorService.shutdownNow(); + Thread.sleep(100); } + + Assert.assertTrue(startLatch.await(3, TimeUnit.SECONDS)); + service.close(); + Assert.assertTrue(latch.await(3, TimeUnit.SECONDS)); + Assert.assertEquals(outsideLatch.getCount(), 1); } - private void submitRunnable(FutureContainer service, final CountDownLatch startLatch, final CountDownLatch latch) + private void submitRunnable(CloseableExecutorService service, final CountDownLatch startLatch, final CountDownLatch latch) { - try - { - service.submit + service.submit ( new Runnable() { @@ -254,29 +235,18 @@ public class TestCloseableExecutorService try { startLatch.countDown(); - count.incrementAndGet(); Thread.sleep(100000); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } - catch ( Throwable e ) - { - e.printStackTrace(); - } finally { - // count.decrementAndGet(); latch.countDown(); } } } ); - } - catch ( Throwable e ) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } } } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java index f42039c..61c3af7 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java @@ -263,16 +263,16 @@ public class PathChildrenCache implements Closeable client.getConnectionStateListenable().addListener(connectionStateListener); executorService.submit - ( - new Runnable() - { - @Override - public void run() + ( + new Runnable() { - mainLoop(); + @Override + public void run() + { + mainLoop(); + } } - } - ); + ); switch ( mode ) { http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java index b0717ed..c8c1510 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java @@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.locks; import com.google.common.base.Preconditions; import com.google.common.io.Closeables; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.CloseableScheduledExecutorService; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.data.Stat; @@ -29,8 +30,8 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -46,10 +47,10 @@ public class ChildReaper implements Closeable private final CuratorFramework client; private final String path; private final Reaper.Mode mode; - private final ScheduledExecutorService executor; + private final CloseableScheduledExecutorService executor; private final int reapingThresholdMs; - private volatile ScheduledFuture<?> task; + private volatile Future<?> task; private enum State { @@ -91,7 +92,7 @@ public class ChildReaper implements Closeable this.client = client; this.path = path; this.mode = mode; - this.executor = executor; + this.executor = new CloseableScheduledExecutorService(executor); this.reapingThresholdMs = reapingThresholdMs; this.reaper = new Reaper(client, executor, reapingThresholdMs); } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java index b540689..037eacd 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -153,7 +154,7 @@ public class Reaper implements Closeable public void addPath(String path, Mode mode) { activePaths.add(path); - executor.schedule(new PathHolder(path, mode, 0), reapingThresholdMs, TimeUnit.MILLISECONDS); + schedule(new PathHolder(path, mode, 0), reapingThresholdMs); } /** @@ -186,6 +187,12 @@ public class Reaper implements Closeable } } + @VisibleForTesting + protected Future<?> schedule(PathHolder pathHolder, int reapingThresholdMs) + { + return executor.schedule(pathHolder, reapingThresholdMs, TimeUnit.MILLISECONDS); + } + private void reap(PathHolder holder) { if ( !activePaths.contains(holder.path) ) @@ -251,7 +258,7 @@ public class Reaper implements Closeable } else if ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED) && activePaths.contains(holder.path) ) { - executor.schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs, TimeUnit.MILLISECONDS); + schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs); } } http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/10df9fc2/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java index 596960d..bd821e4 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.recipes.locks; import com.google.common.io.Closeables; +import junit.framework.Assert; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.BaseClassForTests; @@ -27,13 +28,20 @@ import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.Timing; -import junit.framework.Assert; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.testng.annotations.Test; import java.io.IOException; import java.util.Queue; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; public class TestReaper extends BaseClassForTests { @@ -106,37 +114,36 @@ public class TestReaper extends BaseClassForTests final Queue<Reaper.PathHolder> holders = new ConcurrentLinkedQueue<Reaper.PathHolder>(); final ExecutorService pool = Executors.newCachedThreadPool(); - ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1) + ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1); + + reaper = new Reaper + ( + client, + service, + THRESHOLD + ) { @Override - public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) + protected Future<Void> schedule(final PathHolder pathHolder, int reapingThresholdMs) { - final Reaper.PathHolder pathHolder = (Reaper.PathHolder)command; holders.add(pathHolder); - final ScheduledFuture<?> f = super.schedule(command, delay, unit); + final Future<?> f = super.schedule(pathHolder, reapingThresholdMs); pool.submit - ( - new Callable<Void>() - { - @Override - public Void call() throws Exception + ( + new Callable<Void>() { - f.get(); - holders.remove(pathHolder); - return null; + @Override + public Void call() throws Exception + { + f.get(); + holders.remove(pathHolder); + return null; + } } - } - ); - return f; + ); + return null; } }; - - reaper = new Reaper - ( - client, - service, - THRESHOLD - ); reaper.start(); reaper.addPath("/one/two/three");