Repository: curator Updated Branches: refs/heads/master c65e09141 -> 6e16d0d5c
CURATOR-154 - Modified the handling for creating the ephemeral node so that if it already exists, an attempt will be made to set its data to match the data that the PersistentEphemeralNode has cached. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/64973b0d Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/64973b0d Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/64973b0d Branch: refs/heads/master Commit: 64973b0d91c0625b00d400227e6b1971233df595 Parents: 1c194b4 Author: Cameron McKenzie <came...@unico.com.au> Authored: Wed Oct 29 16:32:35 2014 +1100 Committer: Cameron McKenzie <came...@unico.com.au> Committed: Wed Oct 29 16:32:35 2014 +1100 ---------------------------------------------------------------------- .../recipes/nodes/PersistentEphemeralNode.java | 52 ++++++++++-- .../nodes/TestPersistentEphemeralNode.java | 84 +++++++++++++++++++- 2 files changed, 124 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/64973b0d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java index d78573c..41c04f6 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java @@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.nodes; import com.google.common.base.Preconditions; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable; import org.apache.curator.framework.api.BackgroundCallback; @@ -31,14 +32,17 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.io.Closeable; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; + import org.apache.curator.utils.PathUtils; /** @@ -67,7 +71,10 @@ public class PersistentEphemeralNode implements Closeable @Override public void process(WatchedEvent event) { - createNode(); + if ( event.getType() == EventType.NodeDeleted) + { + createNode(); + } } }; private final BackgroundCallback checkExistsCallback = new BackgroundCallback() @@ -81,6 +88,21 @@ public class PersistentEphemeralNode implements Closeable } } }; + private final BackgroundCallback setDataCallback = new BackgroundCallback() { + + @Override + public void processResult(CuratorFramework client, CuratorEvent event) + throws Exception { + //If the result is ok then initialisation is complete (if we're still initialising) + //Don't retry on other errors as the only recoverable cases will be connection loss + //and the node not existing, both of which are already handled by other watches. + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + //Update is ok, mark initialisation as complete if required. + initialisationComplete(); + } + } + }; private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override @@ -188,12 +210,12 @@ public class PersistentEphemeralNode implements Closeable * @param basePath the base path for the node * @param data data for the node */ - public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[] data) + public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[] initData) { this.client = Preconditions.checkNotNull(client, "client cannot be null"); this.basePath = PathUtils.validatePath(basePath); this.mode = Preconditions.checkNotNull(mode, "mode cannot be null"); - data = Preconditions.checkNotNull(data, "data cannot be null"); + final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null"); backgroundCallback = new BackgroundCallback() { @@ -201,9 +223,11 @@ public class PersistentEphemeralNode implements Closeable public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { String path = null; + boolean nodeExists = false; if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue() ) - { - path = event.getPath(); + { + path = event.getPath(); + nodeExists = true; } else if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { @@ -214,10 +238,13 @@ public class PersistentEphemeralNode implements Closeable nodePath.set(path); watchNode(); - CountDownLatch localLatch = initialCreateLatch.getAndSet(null); - if ( localLatch != null ) + if(nodeExists) { - localLatch.countDown(); + client.setData().inBackground(setDataCallback).forPath(getActualPath(), data); + } + else + { + initialisationComplete(); } } else @@ -230,6 +257,15 @@ public class PersistentEphemeralNode implements Closeable createMethod = mode.isProtected() ? client.create().creatingParentsIfNeeded().withProtection() : client.create().creatingParentsIfNeeded(); this.data.set(Arrays.copyOf(data, data.length)); } + + private void initialisationComplete() + { + CountDownLatch localLatch = initialCreateLatch.getAndSet(null); + if ( localLatch != null ) + { + localLatch.countDown(); + } + } /** * You must call start() to initiate the persistent ephemeral node. An attempt to create the node http://git-wip-us.apache.org/repos/asf/curator/blob/64973b0d/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java index 47ae757..31e7ef2 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java @@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.nodes; import com.google.common.base.Throwables; import com.google.common.collect.Lists; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.state.ConnectionState; @@ -31,12 +32,15 @@ import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.data.Stat; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; + import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -75,7 +79,7 @@ public class TestPersistentEphemeralNode extends BaseClassForTests @Test public void testListenersReconnectedIsFast() throws Exception { - server.close(); + server.stop(); CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); try @@ -103,13 +107,13 @@ public class TestPersistentEphemeralNode extends BaseClassForTests }; client.getConnectionStateListenable().addListener(listener); timing.sleepABit(); - server = new TestingServer(server.getPort()); + server.restart(); Assert.assertTrue(timing.awaitLatch(connectedLatch)); timing.sleepABit(); Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); - server.close(); + server.stop(); timing.sleepABit(); - server = new TestingServer(server.getPort()); + server.restart(); timing.sleepABit(); Assert.assertTrue(timing.awaitLatch(reconnectedLatch)); } @@ -459,6 +463,78 @@ public class TestPersistentEphemeralNode extends BaseClassForTests node.close(); } } + + /** + * Test that if a persistent ephemeral node is created and the node already exists + * that if data is present in the PersistentEphermalNode that it is still set. + * @throws Exception + */ + @Test + public void testSetDataWhenNodeExists() throws Exception + { + CuratorFramework curator = newCurator(); + curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH, "InitialData".getBytes()); + + byte[] data = "Hello World".getBytes(); + + PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data); + node.start(); + try + { + node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS); + assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), data)); + } + finally + { + node.close(); + } + } + + @Test + public void testSetDataWhenDisconnected() throws Exception + { + CuratorFramework curator = newCurator(); + + byte[] initialData = "Hello World".getBytes(); + byte[] updatedData = "Updated".getBytes(); + + PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData); + node.start(); + try + { + node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS); + assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData)); + + server.stop(); + + final CountDownLatch dataUpdateLatch = new CountDownLatch(1); + + Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + if ( event.getType() == EventType.NodeDataChanged ) + { + dataUpdateLatch.countDown(); + } + } + }; + + curator.getData().usingWatcher(watcher).inBackground().forPath(node.getActualPath()); + + node.setData(updatedData); + server.restart(); + + assertTrue(timing.awaitLatch(dataUpdateLatch)); + + assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), updatedData)); + } + finally + { + node.close(); + } + } private void assertNodeExists(CuratorFramework curator, String path) throws Exception {