Repository: curator
Updated Branches:
  refs/heads/CURATOR-161 22d034af9 -> ba4da2c3c


CURATOR-161 - Modified the background processing framework to allow
operations to request that a live connection is not necessary to execute
(this is needed to run the remove watches with 'local' set to true.
Cleaned up some unit tests.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ba4da2c3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ba4da2c3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ba4da2c3

Branch: refs/heads/CURATOR-161
Commit: ba4da2c3c7048ea249f18e7b4c815db76f0b1ad0
Parents: 22d034a
Author: Cameron McKenzie <came...@unico.com.au>
Authored: Thu May 14 09:19:09 2015 +1000
Committer: Cameron McKenzie <came...@unico.com.au>
Committed: Thu May 14 09:19:09 2015 +1000

----------------------------------------------------------------------
 .../framework/imps/CuratorFrameworkImpl.java    |  2 +-
 .../framework/imps/OperationAndData.java        | 16 ++++-
 .../imps/RemoveWatchesBuilderImpl.java          | 22 ++++--
 .../framework/imps/TestRemoveWatches.java       | 73 ++++++++------------
 4 files changed, 58 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index b4a1d93..c82f984 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -821,7 +821,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
     {
         try
         {
-            if ( client.isConnected() )
+            if ( !operationAndData.isConnectionRequired() || 
client.isConnected() )
             {
                 operationAndData.callPerformBackgroundOperation();
             }

http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 38f59a0..b46cddb 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -40,25 +40,37 @@ class OperationAndData<T> implements Delayed, RetrySleeper
     private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
     private final long ordinal = nextOrdinal.getAndIncrement();
     private final Object context;
+    private final boolean connectionRequired;
 
     interface ErrorCallback<T>
     {
         void retriesExhausted(OperationAndData<T> operationAndData);
     }
-
-    OperationAndData(BackgroundOperation<T> operation, T data, 
BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
+    
+    OperationAndData(BackgroundOperation<T> operation, T data, 
BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, 
boolean connectionRequired)
     {
         this.operation = operation;
         this.data = data;
         this.callback = callback;
         this.errorCallback = errorCallback;
         this.context = context;
+        this.connectionRequired = connectionRequired;
+    }      
+
+    OperationAndData(BackgroundOperation<T> operation, T data, 
BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
+    {
+        this(operation, data, callback, errorCallback, context, true);
     }
 
     Object getContext()
     {
         return context;
     }
+    
+    boolean isConnectionRequired()
+    {
+        return connectionRequired;
+    }
 
     void callPerformBackgroundOperation() throws Exception
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
index 27d05da..932706b 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -166,15 +166,23 @@ public class RemoveWatchesBuilderImpl implements 
RemoveWatchesBuilder, RemoveWat
     
     private void pathInBackground(final String path)
     {
-        OperationAndData.ErrorCallback<String>  errorCallback = new 
OperationAndData.ErrorCallback<String>()
+        OperationAndData.ErrorCallback<String>  errorCallback = null;
+        
+        //Only need an error callback if we're in guaranteed mode
+        if(guaranteed)
         {
-            @Override
-            public void retriesExhausted(OperationAndData<String> 
operationAndData)
+            errorCallback = new OperationAndData.ErrorCallback<String>()
             {
-                client.getFailedRemoveWatcherManager().addFailedOperation(new 
FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));
-            }            
-        };        
-        client.processBackgroundOperation(new OperationAndData<String>(this, 
path, backgrounding.getCallback(), errorCallback, backgrounding.getContext()), 
null);
+                @Override
+                public void retriesExhausted(OperationAndData<String> 
operationAndData)
+                {
+                    
client.getFailedRemoveWatcherManager().addFailedOperation(new 
FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));
+                }            
+            };
+        }
+        
+        client.processBackgroundOperation(new OperationAndData<String>(this, 
path, backgrounding.getCallback(),
+                                                                       
errorCallback, backgrounding.getContext(), !local), null);
     }
     
     private void pathInForeground(final String path) throws Exception

http://git-wip-us.apache.org/repos/asf/curator/blob/ba4da2c3/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
index 518f13b..fc15f0c 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -1,8 +1,9 @@
 package org.apache.curator.framework.imps;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -24,13 +25,30 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestRemoveWatches extends BaseClassForTests
 {
+    private boolean blockUntilDesiredConnectionState(CuratorFramework client, 
Timing timing, final ConnectionState desiredState)
+    {
+        final CountDownLatch latch = new CountDownLatch(1);
+        client.getConnectionStateListenable().addListener(new 
ConnectionStateListener()
+        {
+            
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState 
newState)
+            {
+                if(newState == desiredState)
+                {
+                    latch.countDown();
+                }
+            }
+        });
+        
+        return timing.awaitLatch(latch);
+    }
+    
     @Test
     public void testRemoveCuratorDefaultWatcher() throws Exception
     {
@@ -330,7 +348,7 @@ public class TestRemoveWatches extends BaseClassForTests
             //Stop the server so we can check if we can remove watches locally 
when offline
             server.stop();
             
-            timing.sleepABit();
+            blockUntilDesiredConnectionState(client, timing, 
ConnectionState.SUSPENDED);
                        
             client.watches().removeAll().locally().forPath(path);
             
@@ -364,7 +382,7 @@ public class TestRemoveWatches extends BaseClassForTests
             //Stop the server so we can check if we can remove watches locally 
when offline
             server.stop();
             
-            timing.sleepABit();
+            blockUntilDesiredConnectionState(client, timing, 
ConnectionState.SUSPENDED);
                        
             
client.watches().removeAll().locally().inBackground().forPath(path);
             
@@ -452,25 +470,7 @@ public class TestRemoveWatches extends BaseClassForTests
         try
         {
             client.start();
-            
-            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
-            final CountDownLatch suspendedLatch = new CountDownLatch(1);
-            client.getConnectionStateListenable().addListener(new 
ConnectionStateListener()
-            {
-                @Override
-                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
-                {
-                    if(newState == ConnectionState.SUSPENDED)
-                    {
-                        suspendedLatch.countDown();
-                    }
-                    else if(newState == ConnectionState.RECONNECTED)
-                    {
-                        reconnectedLatch.countDown();
-                    }
-                }
-            });
-            
+                       
             String path = "/";
             
             CountDownLatch removeLatch = new CountDownLatch(1);
@@ -479,7 +479,8 @@ public class TestRemoveWatches extends BaseClassForTests
             client.checkExists().usingWatcher(watcher).forPath(path);
             
             server.stop();           
-            timing.awaitLatch(suspendedLatch);
+            
+            blockUntilDesiredConnectionState(client, timing, 
ConnectionState.SUSPENDED);
             
             //Remove the watch while we're not connected
             try 
@@ -510,25 +511,7 @@ public class TestRemoveWatches extends BaseClassForTests
         try
         {
             client.start();
-            
-            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
-            final CountDownLatch suspendedLatch = new CountDownLatch(1);
-            client.getConnectionStateListenable().addListener(new 
ConnectionStateListener()
-            {
-                @Override
-                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
-                {
-                    if(newState == ConnectionState.SUSPENDED)
-                    {
-                        suspendedLatch.countDown();
-                    }
-                    else if(newState == ConnectionState.RECONNECTED)
-                    {
-                        reconnectedLatch.countDown();
-                    }
-                }
-            });
-            
+                        
             final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1);
             
             
((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener = 
new 
FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>()
@@ -550,7 +533,7 @@ public class TestRemoveWatches extends BaseClassForTests
             client.checkExists().usingWatcher(watcher).forPath(path);
             
             server.stop();           
-            timing.awaitLatch(suspendedLatch);
+            blockUntilDesiredConnectionState(client, timing, 
ConnectionState.SUSPENDED);
             
             //Remove the watch while we're not connected
             
client.watches().remove(watcher).guaranteed().inBackground().forPath(path);

Reply via email to