Repository: curator Updated Branches: refs/heads/CURATOR-161 22d034af9 -> ba4da2c3c
CURATOR-161 - Modified the background processing framework to allow operations to request that a live connection is not necessary to execute (this is needed to run the remove watches with 'local' set to true. Cleaned up some unit tests. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ba4da2c3 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ba4da2c3 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ba4da2c3 Branch: refs/heads/CURATOR-161 Commit: ba4da2c3c7048ea249f18e7b4c815db76f0b1ad0 Parents: 22d034a Author: Cameron McKenzie <came...@unico.com.au> Authored: Thu May 14 09:19:09 2015 +1000 Committer: Cameron McKenzie <came...@unico.com.au> Committed: Thu May 14 09:19:09 2015 +1000 ---------------------------------------------------------------------- .../framework/imps/CuratorFrameworkImpl.java | 2 +- .../framework/imps/OperationAndData.java | 16 ++++- .../imps/RemoveWatchesBuilderImpl.java | 22 ++++-- .../framework/imps/TestRemoveWatches.java | 73 ++++++++------------ 4 files changed, 58 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index b4a1d93..c82f984 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -821,7 +821,7 @@ public class CuratorFrameworkImpl implements CuratorFramework { try { - if ( client.isConnected() ) + if ( !operationAndData.isConnectionRequired() || client.isConnected() ) { operationAndData.callPerformBackgroundOperation(); } http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java index 38f59a0..b46cddb 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java @@ -40,25 +40,37 @@ class OperationAndData<T> implements Delayed, RetrySleeper private final AtomicLong sleepUntilTimeMs = new AtomicLong(0); private final long ordinal = nextOrdinal.getAndIncrement(); private final Object context; + private final boolean connectionRequired; interface ErrorCallback<T> { void retriesExhausted(OperationAndData<T> operationAndData); } - - OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context) + + OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired) { this.operation = operation; this.data = data; this.callback = callback; this.errorCallback = errorCallback; this.context = context; + this.connectionRequired = connectionRequired; + } + + OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context) + { + this(operation, data, callback, errorCallback, context, true); } Object getContext() { return context; } + + boolean isConnectionRequired() + { + return connectionRequired; + } void callPerformBackgroundOperation() throws Exception { http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index 27d05da..932706b 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -166,15 +166,23 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat private void pathInBackground(final String path) { - OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>() + OperationAndData.ErrorCallback<String> errorCallback = null; + + //Only need an error callback if we're in guaranteed mode + if(guaranteed) { - @Override - public void retriesExhausted(OperationAndData<String> operationAndData) + errorCallback = new OperationAndData.ErrorCallback<String>() { - client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher)); - } - }; - client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), null); + @Override + public void retriesExhausted(OperationAndData<String> operationAndData) + { + client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher)); + } + }; + } + + client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), + errorCallback, backgrounding.getContext(), !local), null); } private void pathInForeground(final String path) throws Exception http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java index 518f13b..fc15f0c 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java @@ -1,8 +1,9 @@ package org.apache.curator.framework.imps; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -24,13 +25,30 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.WatcherType; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; import org.testng.Assert; import org.testng.annotations.Test; public class TestRemoveWatches extends BaseClassForTests { + private boolean blockUntilDesiredConnectionState(CuratorFramework client, Timing timing, final ConnectionState desiredState) + { + final CountDownLatch latch = new CountDownLatch(1); + client.getConnectionStateListenable().addListener(new ConnectionStateListener() + { + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if(newState == desiredState) + { + latch.countDown(); + } + } + }); + + return timing.awaitLatch(latch); + } + @Test public void testRemoveCuratorDefaultWatcher() throws Exception { @@ -330,7 +348,7 @@ public class TestRemoveWatches extends BaseClassForTests //Stop the server so we can check if we can remove watches locally when offline server.stop(); - timing.sleepABit(); + blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED); client.watches().removeAll().locally().forPath(path); @@ -364,7 +382,7 @@ public class TestRemoveWatches extends BaseClassForTests //Stop the server so we can check if we can remove watches locally when offline server.stop(); - timing.sleepABit(); + blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED); client.watches().removeAll().locally().inBackground().forPath(path); @@ -452,25 +470,7 @@ public class TestRemoveWatches extends BaseClassForTests try { client.start(); - - final CountDownLatch reconnectedLatch = new CountDownLatch(1); - final CountDownLatch suspendedLatch = new CountDownLatch(1); - client.getConnectionStateListenable().addListener(new ConnectionStateListener() - { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - if(newState == ConnectionState.SUSPENDED) - { - suspendedLatch.countDown(); - } - else if(newState == ConnectionState.RECONNECTED) - { - reconnectedLatch.countDown(); - } - } - }); - + String path = "/"; CountDownLatch removeLatch = new CountDownLatch(1); @@ -479,7 +479,8 @@ public class TestRemoveWatches extends BaseClassForTests client.checkExists().usingWatcher(watcher).forPath(path); server.stop(); - timing.awaitLatch(suspendedLatch); + + blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED); //Remove the watch while we're not connected try @@ -510,25 +511,7 @@ public class TestRemoveWatches extends BaseClassForTests try { client.start(); - - final CountDownLatch reconnectedLatch = new CountDownLatch(1); - final CountDownLatch suspendedLatch = new CountDownLatch(1); - client.getConnectionStateListenable().addListener(new ConnectionStateListener() - { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - if(newState == ConnectionState.SUSPENDED) - { - suspendedLatch.countDown(); - } - else if(newState == ConnectionState.RECONNECTED) - { - reconnectedLatch.countDown(); - } - } - }); - + final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1); ((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>() @@ -550,7 +533,7 @@ public class TestRemoveWatches extends BaseClassForTests client.checkExists().usingWatcher(watcher).forPath(path); server.stop(); - timing.awaitLatch(suspendedLatch); + blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED); //Remove the watch while we're not connected client.watches().remove(watcher).guaranteed().inBackground().forPath(path);