continued work on tests, etc.

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/cb34e6f6
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/cb34e6f6
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/cb34e6f6

Branch: refs/heads/CURATOR-266
Commit: cb34e6f6a41b08c9d4e6179d9f893b0e48e7860c
Parents: 2827ba8
Author: randgalt <randg...@apache.org>
Authored: Sun Sep 27 13:31:32 2015 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sun Sep 27 13:31:32 2015 -0500

----------------------------------------------------------------------
 .../ensemble/fixed/FixedEnsembleProvider.java   | 16 ++++++-
 .../framework/imps/CuratorFrameworkImpl.java    |  5 +++
 .../curator/framework/imps/EnsembleTracker.java | 45 ++++++++++++--------
 .../src/site/confluence/index.confluence        |  1 +
 .../framework/imps/TestFrameworkBackground.java |  9 ++--
 .../framework/imps/TestReconfiguration.java     |  9 +++-
 .../recipes/nodes/PersistentEphemeralNode.java  | 27 ++++++------
 .../curator/framework/imps/TestCleanState.java  |  9 ++++
 .../locks/TestInterProcessSemaphoreCluster.java |  6 +++
 .../nodes/TestPersistentEphemeralNode.java      | 16 ++++---
 src/site/confluence/utilities.confluence        |  7 +--
 11 files changed, 104 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
 
b/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
index 28ad1b6..5f486f4 100644
--- 
a/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
+++ 
b/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
@@ -20,6 +20,7 @@ package org.apache.curator.ensemble.fixed;
 
 import com.google.common.base.Preconditions;
 import org.apache.curator.ensemble.EnsembleProvider;
+import org.apache.zookeeper.ZooKeeper;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class FixedEnsembleProvider implements EnsembleProvider
 {
     private final AtomicReference<String> connectionString = new 
AtomicReference<>();
+    private final boolean updateServerListEnabled;
 
     /**
      * The connection string to use
@@ -37,6 +39,18 @@ public class FixedEnsembleProvider implements 
EnsembleProvider
      */
     public FixedEnsembleProvider(String connectionString)
     {
+        this(connectionString, true);
+    }
+
+    /**
+     * The connection string to use
+     *
+     * @param connectionString connection string
+     * @param updateServerListEnabled if true, allow Curator to call {@link 
ZooKeeper#updateServerList(String)}
+     */
+    public FixedEnsembleProvider(String connectionString, boolean 
updateServerListEnabled)
+    {
+        this.updateServerListEnabled = updateServerListEnabled;
         this.connectionString.set(Preconditions.checkNotNull(connectionString, 
"connectionString cannot be null"));
     }
 
@@ -67,6 +81,6 @@ public class FixedEnsembleProvider implements EnsembleProvider
     @Override
     public boolean updateServerListEnabled()
     {
-        return true;
+        return updateServerListEnabled;
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index f2f578c..c3215ad 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -770,6 +770,11 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         connectionStateManager.addStateChange(newConnectionState);
     }
 
+    EnsembleTracker getEnsembleTracker()
+    {
+        return ensembleTracker;
+    }
+
     @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
     private <DATA_TYPE> boolean 
checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent 
event)
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
index d8092fe..acd01ee 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
@@ -43,10 +44,10 @@ import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 
 @VisibleForTesting
-public class EnsembleTracker implements Closeable
+public class EnsembleTracker implements Closeable, CuratorWatcher
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
+    private final WatcherRemoveCuratorFramework client;
     private final EnsembleProvider ensembleProvider;
     private final AtomicReference<State> state = new 
AtomicReference<>(State.LATENT);
     private final ConnectionStateListener connectionStateListener = new 
ConnectionStateListener()
@@ -68,18 +69,6 @@ public class EnsembleTracker implements Closeable
         }
     };
 
-    private final CuratorWatcher watcher = new CuratorWatcher()
-    {
-        @Override
-        public void process(WatchedEvent event) throws Exception
-        {
-            if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
-            {
-                reset();
-            }
-        }
-    };
-
     private enum State
     {
         LATENT,
@@ -89,7 +78,7 @@ public class EnsembleTracker implements Closeable
 
     EnsembleTracker(CuratorFramework client, EnsembleProvider ensembleProvider)
     {
-        this.client = client;
+        this.client = client.newWatcherRemoveCuratorFramework();
         this.ensembleProvider = ensembleProvider;
     }
 
@@ -103,7 +92,20 @@ public class EnsembleTracker implements Closeable
     @Override
     public void close()
     {
-        
client.getConnectionStateListenable().removeListener(connectionStateListener);
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            client.removeWatchers();
+            
client.getConnectionStateListenable().removeListener(connectionStateListener);
+        }
+    }
+
+    @Override
+    public void process(WatchedEvent event) throws Exception
+    {
+        if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
+        {
+            reset();
+        }
     }
 
     private void reset() throws Exception
@@ -119,7 +121,7 @@ public class EnsembleTracker implements Closeable
                 }
             }
         };
-        
client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
+        
client.getConfig().usingWatcher(this).inBackground(backgroundCallback).forEnsemble();
     }
 
     @VisibleForTesting
@@ -145,6 +147,13 @@ public class EnsembleTracker implements Closeable
     {
         log.info("New config event received: " + Arrays.toString(data));
         String connectionString = configToConnectionString(data);
-        ensembleProvider.setConnectionString(connectionString);
+        if ( connectionString.trim().length() > 0 )
+        {
+            ensembleProvider.setConnectionString(connectionString);
+        }
+        else
+        {
+            log.debug("Ignoring new config as it is empty");
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-framework/src/site/confluence/index.confluence 
b/curator-framework/src/site/confluence/index.confluence
index 1f5f329..13df0de 100644
--- a/curator-framework/src/site/confluence/index.confluence
+++ b/curator-framework/src/site/confluence/index.confluence
@@ -7,6 +7,7 @@ ZooKeeper and handles the complexity of managing connections to 
the ZooKeeper cl
 ** There are potential error cases that require ZooKeeper clients to recreate 
a connection and/or retry operations. Curator
  automatically and transparently (mostly) handles these cases.
 ** Watches for NodeDataChanged events and calls updateServerList() as needed.
+** Watches are automatically removed by Curator recipes
 * Cleaner API:
 ** simplifies the raw ZooKeeper methods, events, etc.
 ** provides a modern, fluent interface

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
index 6575018..83dab6b 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -34,6 +34,8 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
@@ -46,6 +48,8 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class TestFrameworkBackground extends BaseClassForTests
 {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
     @Test
     public void testListenerConnectedAtStart() throws Exception
     {
@@ -160,11 +164,10 @@ public class TestFrameworkBackground extends 
BaseClassForTests
                 }
             };
             client.create().inBackground(callback).forPath("/one");
-            client.create().inBackground(callback).forPath("/one/two");
-            client.create().inBackground(callback).forPath("/one/two/three");
-
             Assert.assertEquals(paths.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), "/one");
+            client.create().inBackground(callback).forPath("/one/two");
             Assert.assertEquals(paths.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), "/one/two");
+            client.create().inBackground(callback).forPath("/one/two/three");
             Assert.assertEquals(paths.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), "/one/two/three");
         }
         finally

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index 0ec796b..e399a4d 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -45,6 +45,10 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -380,11 +384,12 @@ public class TestReconfiguration extends BaseClassForTests
         }
     }
 
-    private List<String> toReconfigSpec(Collection<InstanceSpec> instances)
+    private List<String> toReconfigSpec(Collection<InstanceSpec> instances) 
throws Exception
     {
+        String localhost = new InetSocketAddress((InetAddress)null, 
0).getAddress().getHostAddress();
         List<String> specs = Lists.newArrayList();
         for ( InstanceSpec instance : instances ) {
-            specs.add("server." + instance.getServerId() + "=localhost:" + 
instance.getElectionPort() + ":" + instance.getQuorumPort() + ";" + 
instance.getPort());
+            specs.add("server." + instance.getServerId() + "=" + localhost + 
":" + instance.getElectionPort() + ":" + instance.getQuorumPort() + ";" + 
instance.getPort());
         }
         return specs;
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 38c632a..f7a4ff4 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -114,25 +114,13 @@ public class PersistentEphemeralNode implements Closeable
         {
             if ( newState == ConnectionState.RECONNECTED )
             {
-                if ( debugReconnectLatch != null )
-                {
-                    try
-                    {
-                        debugReconnectLatch.await();
-                    }
-                    catch ( InterruptedException e )
-                    {
-                        Thread.currentThread().interrupt();
-                        e.printStackTrace();
-                    }
-                }
                 createNode();
             }
         }
     };
 
     @VisibleForTesting
-    volatile CountDownLatch debugReconnectLatch = null;
+    volatile CountDownLatch debugCreateNodeLatch = null;
 
     private enum State
     {
@@ -401,6 +389,19 @@ public class PersistentEphemeralNode implements Closeable
             return;
         }
 
+        if ( debugCreateNodeLatch != null )
+        {
+            try
+            {
+                debugCreateNodeLatch.await();
+            }
+            catch ( InterruptedException e )
+            {
+                Thread.currentThread().interrupt();
+                e.printStackTrace();
+            }
+        }
+
         try
         {
             String existingPath = nodePath.get();

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
index 82de1fc..f90f463 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
@@ -35,6 +35,11 @@ public class TestCleanState
         try
         {
             CuratorFrameworkImpl internalClient = (CuratorFrameworkImpl)client;
+            EnsembleTracker ensembleTracker = 
internalClient.getEnsembleTracker();
+            if ( ensembleTracker != null )
+            {
+                ensembleTracker.close();
+            }
             ZooKeeper zooKeeper = internalClient.getZooKeeper();
             if ( zooKeeper != null )
             {
@@ -52,6 +57,10 @@ public class TestCleanState
                 }
             }
         }
+        catch ( IllegalStateException ignore )
+        {
+            // client already closed
+        }
         catch ( Exception e )
         {
             e.printStackTrace();    // not sure what to do here

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
index ee49288..c06d042 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
@@ -70,6 +70,12 @@ public class TestInterProcessSemaphoreCluster
                 }
 
                 @Override
+                public boolean updateServerListEnabled()
+                {
+                    return false;
+                }
+
+                @Override
                 public void start() throws Exception
                 {
                 }

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 4162886..0ee6dec 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -306,7 +306,6 @@ public class TestPersistentEphemeralNode extends 
BaseClassForTests
         CuratorFramework observer = newCurator();
 
         PersistentEphemeralNode node = new PersistentEphemeralNode(curator, 
PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
-        node.debugReconnectLatch = new CountDownLatch(1);
         node.start();
         try
         {
@@ -317,11 +316,12 @@ public class TestPersistentEphemeralNode extends 
BaseClassForTests
             Trigger deletedTrigger = Trigger.deleted();
             
observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
+            node.debugCreateNodeLatch = new CountDownLatch(1);
             KillSession.kill(curator.getZookeeperClient().getZooKeeper());
 
             // Make sure the node got deleted
             
assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), 
TimeUnit.SECONDS));
-            node.debugReconnectLatch.countDown();
+            node.debugCreateNodeLatch.countDown();
         }
         finally
         {
@@ -336,7 +336,6 @@ public class TestPersistentEphemeralNode extends 
BaseClassForTests
         CuratorFramework observer = newCurator();
 
         PersistentEphemeralNode node = new PersistentEphemeralNode(curator, 
PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]);
-        node.debugReconnectLatch = new CountDownLatch(1);
         node.start();
         try
         {
@@ -346,11 +345,12 @@ public class TestPersistentEphemeralNode extends 
BaseClassForTests
             Trigger deletedTrigger = Trigger.deleted();
             
observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
 
+            node.debugCreateNodeLatch = new CountDownLatch(1);
             KillSession.kill(curator.getZookeeperClient().getZooKeeper());
 
             // Make sure the node got deleted...
             
assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), 
TimeUnit.SECONDS));
-            node.debugReconnectLatch.countDown();
+            node.debugCreateNodeLatch.countDown();
 
             // Check for it to be recreated...
             Trigger createdTrigger = Trigger.created();
@@ -380,16 +380,16 @@ public class TestPersistentEphemeralNode extends 
BaseClassForTests
             // We should be able to disconnect multiple times and each time 
the node should be recreated.
             for ( int i = 0; i < 5; i++ )
             {
-                node.debugReconnectLatch = new CountDownLatch(1);
                 Trigger deletionTrigger = Trigger.deleted();
                 
observer.checkExists().usingWatcher(deletionTrigger).forPath(path);
 
+                node.debugCreateNodeLatch = new CountDownLatch(1);
                 // Kill the session, thus cleaning up the node...
                 KillSession.kill(curator.getZookeeperClient().getZooKeeper());
 
                 // Make sure the node ended up getting deleted...
                 
assertTrue(deletionTrigger.firedWithin(timing.multiple(1.5).forSessionSleep().seconds(),
 TimeUnit.SECONDS));
-                node.debugReconnectLatch.countDown();
+                node.debugCreateNodeLatch.countDown();
 
                 // Now put a watch in the background looking to see if it gets 
created...
                 Trigger creationTrigger = Trigger.created();
@@ -706,6 +706,10 @@ public class TestPersistentEphemeralNode extends 
BaseClassForTests
             {
                 latch.countDown();
             }
+            else if ( type != EventType.None )
+            {
+                Assert.fail("Unexpected watcher event: " + event);
+            }
         }
 
         public boolean firedWithin(long duration, TimeUnit unit)

http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/src/site/confluence/utilities.confluence
----------------------------------------------------------------------
diff --git a/src/site/confluence/utilities.confluence 
b/src/site/confluence/utilities.confluence
index efacb3c..3a62fa5 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -38,7 +38,8 @@ Due to limitations in ZooKeeper's transport layer, a single 
queue will break if
 provides a facade over multiple distributed queues. It monitors the queues and 
if any one of them goes over a threshold, a new
 queue is added. Puts are distributed amongst the queues.
 
-h2. EnsembleTracker
+h2. WatcherRemoveCuratorFramework
 
-Utility to listen for ensemble/configuration changes via registered 
EnsembleListeners. Allocate a EnsembleTracker, add one or more listeners
-and start it.
+Curator has a utility that makes it easy to set watchers and remove them at a 
later date. It is used for all Curator recipes.
+From your CuratorFramework instance, call newWatcherRemoveCuratorFramework(). 
When using this proxy instance any watchers that are
+set are recorded. You can then call removeWatchers() to remove those watchers. 
See the Curator source code for usage details.

Reply via email to