continued work on tests, etc.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/cb34e6f6 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/cb34e6f6 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/cb34e6f6 Branch: refs/heads/CURATOR-266 Commit: cb34e6f6a41b08c9d4e6179d9f893b0e48e7860c Parents: 2827ba8 Author: randgalt <randg...@apache.org> Authored: Sun Sep 27 13:31:32 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Sun Sep 27 13:31:32 2015 -0500 ---------------------------------------------------------------------- .../ensemble/fixed/FixedEnsembleProvider.java | 16 ++++++- .../framework/imps/CuratorFrameworkImpl.java | 5 +++ .../curator/framework/imps/EnsembleTracker.java | 45 ++++++++++++-------- .../src/site/confluence/index.confluence | 1 + .../framework/imps/TestFrameworkBackground.java | 9 ++-- .../framework/imps/TestReconfiguration.java | 9 +++- .../recipes/nodes/PersistentEphemeralNode.java | 27 ++++++------ .../curator/framework/imps/TestCleanState.java | 9 ++++ .../locks/TestInterProcessSemaphoreCluster.java | 6 +++ .../nodes/TestPersistentEphemeralNode.java | 16 ++++--- src/site/confluence/utilities.confluence | 7 +-- 11 files changed, 104 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java b/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java index 28ad1b6..5f486f4 100644 --- a/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java +++ b/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java @@ -20,6 +20,7 @@ package org.apache.curator.ensemble.fixed; import com.google.common.base.Preconditions; import org.apache.curator.ensemble.EnsembleProvider; +import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; @@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; public class FixedEnsembleProvider implements EnsembleProvider { private final AtomicReference<String> connectionString = new AtomicReference<>(); + private final boolean updateServerListEnabled; /** * The connection string to use @@ -37,6 +39,18 @@ public class FixedEnsembleProvider implements EnsembleProvider */ public FixedEnsembleProvider(String connectionString) { + this(connectionString, true); + } + + /** + * The connection string to use + * + * @param connectionString connection string + * @param updateServerListEnabled if true, allow Curator to call {@link ZooKeeper#updateServerList(String)} + */ + public FixedEnsembleProvider(String connectionString, boolean updateServerListEnabled) + { + this.updateServerListEnabled = updateServerListEnabled; this.connectionString.set(Preconditions.checkNotNull(connectionString, "connectionString cannot be null")); } @@ -67,6 +81,6 @@ public class FixedEnsembleProvider implements EnsembleProvider @Override public boolean updateServerListEnabled() { - return true; + return updateServerListEnabled; } } http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index f2f578c..c3215ad 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -770,6 +770,11 @@ public class CuratorFrameworkImpl implements CuratorFramework connectionStateManager.addStateChange(newConnectionState); } + EnsembleTracker getEnsembleTracker() + { + return ensembleTracker; + } + @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"}) private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event) { http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java index d8092fe..acd01ee 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.curator.ensemble.EnsembleProvider; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.WatcherRemoveCuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; @@ -43,10 +44,10 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; @VisibleForTesting -public class EnsembleTracker implements Closeable +public class EnsembleTracker implements Closeable, CuratorWatcher { private final Logger log = LoggerFactory.getLogger(getClass()); - private final CuratorFramework client; + private final WatcherRemoveCuratorFramework client; private final EnsembleProvider ensembleProvider; private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() @@ -68,18 +69,6 @@ public class EnsembleTracker implements Closeable } }; - private final CuratorWatcher watcher = new CuratorWatcher() - { - @Override - public void process(WatchedEvent event) throws Exception - { - if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) - { - reset(); - } - } - }; - private enum State { LATENT, @@ -89,7 +78,7 @@ public class EnsembleTracker implements Closeable EnsembleTracker(CuratorFramework client, EnsembleProvider ensembleProvider) { - this.client = client; + this.client = client.newWatcherRemoveCuratorFramework(); this.ensembleProvider = ensembleProvider; } @@ -103,7 +92,20 @@ public class EnsembleTracker implements Closeable @Override public void close() { - client.getConnectionStateListenable().removeListener(connectionStateListener); + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + client.removeWatchers(); + client.getConnectionStateListenable().removeListener(connectionStateListener); + } + } + + @Override + public void process(WatchedEvent event) throws Exception + { + if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) + { + reset(); + } } private void reset() throws Exception @@ -119,7 +121,7 @@ public class EnsembleTracker implements Closeable } } }; - client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble(); + client.getConfig().usingWatcher(this).inBackground(backgroundCallback).forEnsemble(); } @VisibleForTesting @@ -145,6 +147,13 @@ public class EnsembleTracker implements Closeable { log.info("New config event received: " + Arrays.toString(data)); String connectionString = configToConnectionString(data); - ensembleProvider.setConnectionString(connectionString); + if ( connectionString.trim().length() > 0 ) + { + ensembleProvider.setConnectionString(connectionString); + } + else + { + log.debug("Ignoring new config as it is empty"); + } } } http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/site/confluence/index.confluence ---------------------------------------------------------------------- diff --git a/curator-framework/src/site/confluence/index.confluence b/curator-framework/src/site/confluence/index.confluence index 1f5f329..13df0de 100644 --- a/curator-framework/src/site/confluence/index.confluence +++ b/curator-framework/src/site/confluence/index.confluence @@ -7,6 +7,7 @@ ZooKeeper and handles the complexity of managing connections to the ZooKeeper cl ** There are potential error cases that require ZooKeeper clients to recreate a connection and/or retry operations. Curator automatically and transparently (mostly) handles these cases. ** Watches for NodeDataChanged events and calls updateServerList() as needed. +** Watches are automatically removed by Curator recipes * Cleaner API: ** simplifies the raw ZooKeeper methods, events, etc. ** provides a modern, fluent interface http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java index 6575018..83dab6b 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java @@ -34,6 +34,8 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.KeeperException.Code; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.Test; import java.util.List; @@ -46,6 +48,8 @@ import java.util.concurrent.atomic.AtomicReference; public class TestFrameworkBackground extends BaseClassForTests { + private final Logger log = LoggerFactory.getLogger(getClass()); + @Test public void testListenerConnectedAtStart() throws Exception { @@ -160,11 +164,10 @@ public class TestFrameworkBackground extends BaseClassForTests } }; client.create().inBackground(callback).forPath("/one"); - client.create().inBackground(callback).forPath("/one/two"); - client.create().inBackground(callback).forPath("/one/two/three"); - Assert.assertEquals(paths.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), "/one"); + client.create().inBackground(callback).forPath("/one/two"); Assert.assertEquals(paths.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), "/one/two"); + client.create().inBackground(callback).forPath("/one/two/three"); Assert.assertEquals(paths.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), "/one/two/three"); } finally http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java index 0ec796b..e399a4d 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -45,6 +45,10 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -380,11 +384,12 @@ public class TestReconfiguration extends BaseClassForTests } } - private List<String> toReconfigSpec(Collection<InstanceSpec> instances) + private List<String> toReconfigSpec(Collection<InstanceSpec> instances) throws Exception { + String localhost = new InetSocketAddress((InetAddress)null, 0).getAddress().getHostAddress(); List<String> specs = Lists.newArrayList(); for ( InstanceSpec instance : instances ) { - specs.add("server." + instance.getServerId() + "=localhost:" + instance.getElectionPort() + ":" + instance.getQuorumPort() + ";" + instance.getPort()); + specs.add("server." + instance.getServerId() + "=" + localhost + ":" + instance.getElectionPort() + ":" + instance.getQuorumPort() + ";" + instance.getPort()); } return specs; } http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java index 38c632a..f7a4ff4 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java @@ -114,25 +114,13 @@ public class PersistentEphemeralNode implements Closeable { if ( newState == ConnectionState.RECONNECTED ) { - if ( debugReconnectLatch != null ) - { - try - { - debugReconnectLatch.await(); - } - catch ( InterruptedException e ) - { - Thread.currentThread().interrupt(); - e.printStackTrace(); - } - } createNode(); } } }; @VisibleForTesting - volatile CountDownLatch debugReconnectLatch = null; + volatile CountDownLatch debugCreateNodeLatch = null; private enum State { @@ -401,6 +389,19 @@ public class PersistentEphemeralNode implements Closeable return; } + if ( debugCreateNodeLatch != null ) + { + try + { + debugCreateNodeLatch.await(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + } + try { String existingPath = nodePath.get(); http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java index 82de1fc..f90f463 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java @@ -35,6 +35,11 @@ public class TestCleanState try { CuratorFrameworkImpl internalClient = (CuratorFrameworkImpl)client; + EnsembleTracker ensembleTracker = internalClient.getEnsembleTracker(); + if ( ensembleTracker != null ) + { + ensembleTracker.close(); + } ZooKeeper zooKeeper = internalClient.getZooKeeper(); if ( zooKeeper != null ) { @@ -52,6 +57,10 @@ public class TestCleanState } } } + catch ( IllegalStateException ignore ) + { + // client already closed + } catch ( Exception e ) { e.printStackTrace(); // not sure what to do here http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java index ee49288..c06d042 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java @@ -70,6 +70,12 @@ public class TestInterProcessSemaphoreCluster } @Override + public boolean updateServerListEnabled() + { + return false; + } + + @Override public void start() throws Exception { } http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java index 4162886..0ee6dec 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java @@ -306,7 +306,6 @@ public class TestPersistentEphemeralNode extends BaseClassForTests CuratorFramework observer = newCurator(); PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]); - node.debugReconnectLatch = new CountDownLatch(1); node.start(); try { @@ -317,11 +316,12 @@ public class TestPersistentEphemeralNode extends BaseClassForTests Trigger deletedTrigger = Trigger.deleted(); observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath()); + node.debugCreateNodeLatch = new CountDownLatch(1); KillSession.kill(curator.getZookeeperClient().getZooKeeper()); // Make sure the node got deleted assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), TimeUnit.SECONDS)); - node.debugReconnectLatch.countDown(); + node.debugCreateNodeLatch.countDown(); } finally { @@ -336,7 +336,6 @@ public class TestPersistentEphemeralNode extends BaseClassForTests CuratorFramework observer = newCurator(); PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, new byte[0]); - node.debugReconnectLatch = new CountDownLatch(1); node.start(); try { @@ -346,11 +345,12 @@ public class TestPersistentEphemeralNode extends BaseClassForTests Trigger deletedTrigger = Trigger.deleted(); observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath()); + node.debugCreateNodeLatch = new CountDownLatch(1); KillSession.kill(curator.getZookeeperClient().getZooKeeper()); // Make sure the node got deleted... assertTrue(deletedTrigger.firedWithin(timing.forSessionSleep().seconds(), TimeUnit.SECONDS)); - node.debugReconnectLatch.countDown(); + node.debugCreateNodeLatch.countDown(); // Check for it to be recreated... Trigger createdTrigger = Trigger.created(); @@ -380,16 +380,16 @@ public class TestPersistentEphemeralNode extends BaseClassForTests // We should be able to disconnect multiple times and each time the node should be recreated. for ( int i = 0; i < 5; i++ ) { - node.debugReconnectLatch = new CountDownLatch(1); Trigger deletionTrigger = Trigger.deleted(); observer.checkExists().usingWatcher(deletionTrigger).forPath(path); + node.debugCreateNodeLatch = new CountDownLatch(1); // Kill the session, thus cleaning up the node... KillSession.kill(curator.getZookeeperClient().getZooKeeper()); // Make sure the node ended up getting deleted... assertTrue(deletionTrigger.firedWithin(timing.multiple(1.5).forSessionSleep().seconds(), TimeUnit.SECONDS)); - node.debugReconnectLatch.countDown(); + node.debugCreateNodeLatch.countDown(); // Now put a watch in the background looking to see if it gets created... Trigger creationTrigger = Trigger.created(); @@ -706,6 +706,10 @@ public class TestPersistentEphemeralNode extends BaseClassForTests { latch.countDown(); } + else if ( type != EventType.None ) + { + Assert.fail("Unexpected watcher event: " + event); + } } public boolean firedWithin(long duration, TimeUnit unit) http://git-wip-us.apache.org/repos/asf/curator/blob/cb34e6f6/src/site/confluence/utilities.confluence ---------------------------------------------------------------------- diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence index efacb3c..3a62fa5 100644 --- a/src/site/confluence/utilities.confluence +++ b/src/site/confluence/utilities.confluence @@ -38,7 +38,8 @@ Due to limitations in ZooKeeper's transport layer, a single queue will break if provides a facade over multiple distributed queues. It monitors the queues and if any one of them goes over a threshold, a new queue is added. Puts are distributed amongst the queues. -h2. EnsembleTracker +h2. WatcherRemoveCuratorFramework -Utility to listen for ensemble/configuration changes via registered EnsembleListeners. Allocate a EnsembleTracker, add one or more listeners -and start it. +Curator has a utility that makes it easy to set watchers and remove them at a later date. It is used for all Curator recipes. +From your CuratorFramework instance, call newWatcherRemoveCuratorFramework(). When using this proxy instance any watchers that are +set are recorded. You can then call removeWatchers() to remove those watchers. See the Curator source code for usage details.