Repository: curator Updated Branches: refs/heads/CURATOR-351 d0299661e -> a9cbdd31d
initial test and doc updates Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/994778c3 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/994778c3 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/994778c3 Branch: refs/heads/CURATOR-351 Commit: 994778c31242942c43da741c5124d8c0f2320ea0 Parents: d029966 Author: randgalt <randg...@apache.org> Authored: Fri Dec 9 14:39:35 2016 +0100 Committer: randgalt <randg...@apache.org> Committed: Fri Dec 9 14:39:35 2016 +0100 ---------------------------------------------------------------------- .../org/apache/curator/utils/ThreadUtils.java | 4 +- .../framework/recipes/nodes/PersistentNode.java | 2 +- .../recipes/nodes/PersistentTtlNode.java | 190 +++++++++++++++++++ .../src/site/confluence/index.confluence | 3 +- .../persistent-ephemeral-node.confluence | 2 +- .../confluence/persistent-ttl-node.confluence | 39 ++++ .../recipes/nodes/TestPersistentTtlNode.java | 58 ++++++ 7 files changed, 294 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/994778c3/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java b/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java index 74b4e40..bc93604 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java +++ b/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java @@ -31,12 +31,14 @@ public class ThreadUtils { private static final Logger log = LoggerFactory.getLogger(ThreadUtils.class); - public static void checkInterrupted(Throwable e) + public static boolean checkInterrupted(Throwable e) { if ( e instanceof InterruptedException ) { Thread.currentThread().interrupt(); + return true; } + return false; } public static ExecutorService newSingleThreadExecutor(String processName) http://git-wip-us.apache.org/repos/asf/curator/blob/994778c3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java index eaa91b7..e956266 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java @@ -386,7 +386,7 @@ public class PersistentNode implements Closeable return this.data.get(); } - private void deleteNode() throws Exception + protected void deleteNode() throws Exception { String localNodePath = nodePath.getAndSet(null); if ( localNodePath != null ) http://git-wip-us.apache.org/repos/asf/curator/blob/994778c3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java new file mode 100644 index 0000000..3eda9e2 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentTtlNode.java @@ -0,0 +1,190 @@ +package org.apache.curator.framework.recipes.nodes; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ThreadUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * <p> + * Manages a {@link PersistentNode} that uses {@link CreateMode#CONTAINER}. Asynchronously + * it creates or updates a child on the persistent node that is marked with a provided TTL. + * </p> + * + * <p> + * The effect of this is to have a node that can be watched, etc. The child node serves as + * a method of having the parent node deleted if the TTL expires. i.e. if the process + * that is running the PersistentTtlNode crashes and the TTL elapses, first the child node + * will be deleted due to the TTL expiration and then the parent node will be deleted as it's + * a container node with no children. + * </p> + * + * <p> + * PersistentTtlNode is useful when you need to create a TTL node but don't want to keep + * it alive manually by periodically setting data - PersistentTtlNode does that for you. Further + * the keep-alive is done in a way that does not generate watch triggers on the parent node. + * </p> + */ +public class PersistentTtlNode implements Closeable +{ + public static final String DEFAULT_CHILD_NODE_NAME = "touch"; + public static final int DEFAULT_TOUCH_SCHEDULE_FACTOR = 2; + + private final Logger log = LoggerFactory.getLogger(getClass()); + private final PersistentNode node; + private final CuratorFramework client; + private final long ttlMs; + private final int touchScheduleFactor; + private final ScheduledExecutorService executorService; + private final AtomicReference<Future<?>> futureRef = new AtomicReference<>(); + private final String childPath; + + /** + * @param client the client + * @param path path for the parent ZNode + * @param ttlMs max ttl for the node in milliseconds + * @param initData data for the node + */ + public PersistentTtlNode(CuratorFramework client, String path, long ttlMs, byte[] initData) + { + this(client, Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("PersistentTtlNode")), path, ttlMs, initData, DEFAULT_CHILD_NODE_NAME, DEFAULT_TOUCH_SCHEDULE_FACTOR); + } + + /** + * @param client the client + * @param path path for the parent ZNode + * @param ttlMs max ttl for the node in milliseconds + * @param initData data for the node + * @param childNodeName name to use for the child node of the node created at <code>path</code> + * @param touchScheduleFactor how ofter to set/create the child node as a factor of the ttlMs. i.e. + * the child is touched every <code>(ttlMs / touchScheduleFactor)</code> + */ + public PersistentTtlNode(CuratorFramework client, ScheduledExecutorService executorService, String path, long ttlMs, byte[] initData, String childNodeName, int touchScheduleFactor) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); + this.ttlMs = ttlMs; + this.touchScheduleFactor = touchScheduleFactor; + node = new PersistentNode(client, CreateMode.CONTAINER, false, path, initData) + { + @Override + protected void deleteNode() + { + // NOP + } + }; + this.executorService = Objects.requireNonNull(executorService, "executorService cannot be null"); + childPath = ZKPaths.makePath(Objects.requireNonNull(path, "path cannot be null"), childNodeName); + } + + /** + * You must call start() to initiate the persistent ttl node + */ + public void start() + { + node.start(); + + Runnable touchTask = new Runnable() + { + @Override + public void run() + { + try + { + try + { + client.setData().forPath(childPath); + } + catch ( KeeperException.NoNodeException e ) + { + client.create().orSetData().withTtl(ttlMs).withMode(CreateMode.PERSISTENT_WITH_TTL).forPath(childPath); + } + } + catch ( KeeperException.NoNodeException ignore ) + { + // ignore + } + catch ( Exception e ) + { + if ( !ThreadUtils.checkInterrupted(e) ) + { + log.debug("Could not touch child node", e); + } + } + } + }; + Future<?> future = executorService.scheduleAtFixedRate(touchTask, ttlMs / touchScheduleFactor, ttlMs / touchScheduleFactor, TimeUnit.MILLISECONDS); + futureRef.set(future); + } + + /** + * Block until the either initial node creation initiated by {@link #start()} succeeds or + * the timeout elapses. + * + * @param timeout the maximum time to wait + * @param unit time unit + * @return if the node was created before timeout + * @throws InterruptedException if the thread is interrupted + */ + public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException + { + return node.waitForInitialCreate(timeout, unit); + } + + /** + * Set data that node should set in ZK also writes the data to the node. NOTE: it + * is an error to call this method after {@link #start()} but before the initial create + * has completed. Use {@link #waitForInitialCreate(long, TimeUnit)} to ensure initial + * creation. + * + * @param data new data value + * @throws Exception errors + */ + public void setData(byte[] data) throws Exception + { + node.setData(data); + } + + /** + * Return the current value of our data + * + * @return our data + */ + public byte[] getData() + { + return node.getData(); + } + + /** + * Call when you are done with the PersistentTtlNode. Note: the ZNode is <em>not</em> immediately + * deleted. However, if no other PersistentTtlNode with the same path is running the node will get deleted + * based on the ttl. + */ + @Override + public void close() + { + Future<?> future = futureRef.getAndSet(null); + if ( future != null ) + { + future.cancel(true); + } + try + { + node.close(); + } + catch ( IOException e ) + { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/994778c3/curator-recipes/src/site/confluence/index.confluence ---------------------------------------------------------------------- diff --git a/curator-recipes/src/site/confluence/index.confluence b/curator-recipes/src/site/confluence/index.confluence index 01dfc7e..4cf2cde 100644 --- a/curator-recipes/src/site/confluence/index.confluence +++ b/curator-recipes/src/site/confluence/index.confluence @@ -29,7 +29,8 @@ regarding "Curator Recipes Own Their ZNode/Paths". |[[Tree Cache|tree-cache.html]] \- A utility that attempts to keep all data from all children of a ZK path locally cached. This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.| ||Nodes|| -|[[Persistent Ephemeral Node|persistent-ephemeral-node.html]] \- An ephemeral node that attempts to stay present in ZooKeeper, even through connection and session interruptions.| +|[[Persistent Node|persistent-ephemeral-node.html]] \- A node that attempts to stay present in ZooKeeper, even through connection and session interruptions.| +|[[Persistent TTL Node|persistent-ttl-node.html]] \- Useful when you need to create a TTL node but don't want to keep it alive manually by periodically setting data.| |[Group Member|group-member.html]] \- Group membership management. Adds this instance into a group and keeps a cache of members in the group.| ||Queues|| http://git-wip-us.apache.org/repos/asf/curator/blob/994778c3/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence ---------------------------------------------------------------------- diff --git a/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence b/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence index aeb9e10..5baca09 100644 --- a/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence +++ b/curator-recipes/src/site/confluence/persistent-ephemeral-node.confluence @@ -1,4 +1,4 @@ -h1. Persistent Ephemeral Node +h1. Persistent Node h2. Description A persistent node is a node that attempts to stay present in ZooKeeper, even through connection and session interruptions. http://git-wip-us.apache.org/repos/asf/curator/blob/994778c3/curator-recipes/src/site/confluence/persistent-ttl-node.confluence ---------------------------------------------------------------------- diff --git a/curator-recipes/src/site/confluence/persistent-ttl-node.confluence b/curator-recipes/src/site/confluence/persistent-ttl-node.confluence new file mode 100644 index 0000000..acd2d33 --- /dev/null +++ b/curator-recipes/src/site/confluence/persistent-ttl-node.confluence @@ -0,0 +1,39 @@ +h1. Persistent TTL Node + +h2. Description +PersistentTtlNode is useful when you need to create a TTL node but don't want to keep it alive manually by periodically setting data \- +PersistentTtlNode does that for you. Further the keep\-alive is done in a way that does not generate watch triggers on the parent node. + +h2. Participating Classes +* PersistentNode +* PersistentTtlNode + +h2. Usage +h3. Creating a PersistentTtlNode +{code} +public PersistentTtlNode(CuratorFramework client, + String path, + long ttlMs, + byte[] initData) +Parameters: +client - client instance +path path for the parent ZNode +ttlMs max ttl for the node in milliseconds +initData - initData for the node +{code} + +h3. General Usage +PersistentTtlNode must be started: +{code} +node.start(); +{code} + +When you are through with the PersistentTtlNode instance, you should call close: +{code} +node.close(); +{code} + +NOTE: this will NOT delete the node immediately. The node will get deleted based on the ttl. + +h2. Error Handling +PersistentTtlNode instances internally handle all error states recreating the node as necessary. http://git-wip-us.apache.org/repos/asf/curator/blob/994778c3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java new file mode 100644 index 0000000..9ae8df9 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentTtlNode.java @@ -0,0 +1,58 @@ +package org.apache.curator.framework.recipes.nodes; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.Timing; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.concurrent.TimeUnit; + +public class TestPersistentTtlNode extends BaseClassForTests +{ + private final Timing timing = new Timing(); + + @BeforeMethod + @Override + public void setup() throws Exception + { + System.setProperty("znode.container.checkIntervalMs", "1"); + super.setup(); + } + + @AfterMethod + @Override + public void teardown() throws Exception + { + System.clearProperty("znode.container.checkIntervalMs"); + super.teardown(); + } + + @Test + public void testBasic() throws Exception + { + try (CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1))) + { + client.start(); + + try (PersistentTtlNode node = new PersistentTtlNode(client, "/test", 10, new byte[0])) + { + node.start(); + node.waitForInitialCreate(timing.session(), TimeUnit.MILLISECONDS); + + for ( int i = 0; i < 10; ++i ) + { + Thread.sleep(10); + Assert.assertNotNull(client.checkExists().forPath("/test")); + } + } + + timing.sleepABit(); + + Assert.assertNull(client.checkExists().forPath("/test")); + } + } +}