Repository: curator
Updated Branches:
  refs/heads/master c65e09141 -> 6e16d0d5c


CURATOR-154 - Modified the handling for creating the ephemeral node so
that if it already exists, an attempt will be made to set its data to
match the data that the PersistentEphemeralNode has cached.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/64973b0d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/64973b0d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/64973b0d

Branch: refs/heads/master
Commit: 64973b0d91c0625b00d400227e6b1971233df595
Parents: 1c194b4
Author: Cameron McKenzie <came...@unico.com.au>
Authored: Wed Oct 29 16:32:35 2014 +1100
Committer: Cameron McKenzie <came...@unico.com.au>
Committed: Wed Oct 29 16:32:35 2014 +1100

----------------------------------------------------------------------
 .../recipes/nodes/PersistentEphemeralNode.java  | 52 ++++++++++--
 .../nodes/TestPersistentEphemeralNode.java      | 84 +++++++++++++++++++-
 2 files changed, 124 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/64973b0d/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index d78573c..41c04f6 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -20,6 +20,7 @@
 package org.apache.curator.framework.recipes.nodes;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
 import org.apache.curator.framework.api.BackgroundCallback;
@@ -31,14 +32,17 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.curator.utils.PathUtils;
 
 /**
@@ -67,7 +71,10 @@ public class PersistentEphemeralNode implements Closeable
         @Override
         public void process(WatchedEvent event)
         {
-            createNode();
+               if ( event.getType() == EventType.NodeDeleted)
+               {
+                       createNode();
+               }
         }
     };
     private final BackgroundCallback checkExistsCallback = new 
BackgroundCallback()
@@ -81,6 +88,21 @@ public class PersistentEphemeralNode implements Closeable
             }
         }
     };
+    private final BackgroundCallback setDataCallback = new 
BackgroundCallback() {
+               
+               @Override
+               public void processResult(CuratorFramework client, CuratorEvent 
event)
+                               throws Exception {
+                       //If the result is ok then initialisation is complete 
(if we're still initialising)
+                       //Don't retry on other errors as the only recoverable 
cases will be connection loss
+                       //and the node not existing, both of which are already 
handled by other watches.
+                       if ( event.getResultCode() == 
KeeperException.Code.OK.intValue() )
+                       {
+                               //Update is ok, mark initialisation as complete 
if required.
+                               initialisationComplete();
+                       }
+               }
+       };      
     private final ConnectionStateListener connectionStateListener = new 
ConnectionStateListener()
     {
         @Override
@@ -188,12 +210,12 @@ public class PersistentEphemeralNode implements Closeable
      * @param basePath the base path for the node
      * @param data     data for the node
      */
-    public PersistentEphemeralNode(CuratorFramework client, Mode mode, String 
basePath, byte[] data)
+    public PersistentEphemeralNode(CuratorFramework client, Mode mode, String 
basePath, byte[] initData)
     {
         this.client = Preconditions.checkNotNull(client, "client cannot be 
null");
         this.basePath = PathUtils.validatePath(basePath);
         this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
-        data = Preconditions.checkNotNull(data, "data cannot be null");
+        final byte[] data = Preconditions.checkNotNull(initData, "data cannot 
be null");
 
         backgroundCallback = new BackgroundCallback()
         {
@@ -201,9 +223,11 @@ public class PersistentEphemeralNode implements Closeable
             public void processResult(CuratorFramework client, CuratorEvent 
event) throws Exception
             {
                 String path = null;
+                boolean nodeExists = false;
                 if ( event.getResultCode() == 
KeeperException.Code.NODEEXISTS.intValue() )
-                {
-                    path = event.getPath();
+                {                      
+                       path = event.getPath();
+                       nodeExists = true;
                 }
                 else if ( event.getResultCode() == 
KeeperException.Code.OK.intValue() )
                 {
@@ -214,10 +238,13 @@ public class PersistentEphemeralNode implements Closeable
                     nodePath.set(path);
                     watchNode();
 
-                    CountDownLatch localLatch = 
initialCreateLatch.getAndSet(null);
-                    if ( localLatch != null )
+                    if(nodeExists)
                     {
-                        localLatch.countDown();
+                       
client.setData().inBackground(setDataCallback).forPath(getActualPath(), data);
+                    }
+                    else
+                    {
+                       initialisationComplete();
                     }
                 }
                 else
@@ -230,6 +257,15 @@ public class PersistentEphemeralNode implements Closeable
         createMethod = mode.isProtected() ? 
client.create().creatingParentsIfNeeded().withProtection() : 
client.create().creatingParentsIfNeeded();
         this.data.set(Arrays.copyOf(data, data.length));
     }
+    
+    private void initialisationComplete()
+    {
+        CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+        if ( localLatch != null )
+        {
+            localLatch.countDown();
+        }
+    }
 
     /**
      * You must call start() to initiate the persistent ephemeral node. An 
attempt to create the node

http://git-wip-us.apache.org/repos/asf/curator/blob/64973b0d/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 47ae757..31e7ef2 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.recipes.nodes;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
@@ -31,12 +32,15 @@ import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -75,7 +79,7 @@ public class TestPersistentEphemeralNode extends 
BaseClassForTests
     @Test
     public void testListenersReconnectedIsFast() throws Exception
     {
-        server.close();
+        server.stop();
 
         CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
         try
@@ -103,13 +107,13 @@ public class TestPersistentEphemeralNode extends 
BaseClassForTests
             };
             client.getConnectionStateListenable().addListener(listener);
             timing.sleepABit();
-            server = new TestingServer(server.getPort());
+            server.restart();
             Assert.assertTrue(timing.awaitLatch(connectedLatch));
             timing.sleepABit();
             
Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS));
-            server.close();
+            server.stop();
             timing.sleepABit();
-            server = new TestingServer(server.getPort());
+            server.restart();
             timing.sleepABit();
             Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
         }
@@ -459,6 +463,78 @@ public class TestPersistentEphemeralNode extends 
BaseClassForTests
             node.close();
         }
     }
+    
+    /**
+     * Test that if a persistent ephemeral node is created and the node 
already exists
+     * that if data is present in the PersistentEphermalNode that it is still 
set. 
+     * @throws Exception
+     */
+    @Test
+    public void testSetDataWhenNodeExists() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+        
curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH,
 "InitialData".getBytes());
+        
+        byte[] data = "Hello World".getBytes();
+             
+        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, 
PersistentEphemeralNode.Mode.EPHEMERAL, PATH, data);
+        node.start();
+        try
+        {
+            node.waitForInitialCreate(timing.forWaiting().seconds(), 
TimeUnit.SECONDS);
+            
assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), 
data));
+        }
+        finally
+        {
+            node.close();
+        }
+    }
+    
+    @Test
+    public void testSetDataWhenDisconnected() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+        
+        byte[] initialData = "Hello World".getBytes();
+        byte[] updatedData = "Updated".getBytes();
+             
+        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, 
PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData);
+        node.start();
+        try
+        {
+            node.waitForInitialCreate(timing.forWaiting().seconds(), 
TimeUnit.SECONDS);
+            
assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), 
initialData));
+            
+            server.stop();
+            
+            final CountDownLatch dataUpdateLatch = new CountDownLatch(1);
+            
+            Watcher watcher = new Watcher()
+            {
+                               @Override
+                               public void process(WatchedEvent event)
+                               {
+                                       if ( event.getType() == 
EventType.NodeDataChanged )
+                                       {
+                                               dataUpdateLatch.countDown();
+                                       }
+                               }               
+            };
+            
+            
curator.getData().usingWatcher(watcher).inBackground().forPath(node.getActualPath());
+            
+            node.setData(updatedData);
+            server.restart();
+
+            assertTrue(timing.awaitLatch(dataUpdateLatch));
+                       
+            
assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), 
updatedData));
+        }
+        finally
+        {
+            node.close();
+        }      
+    }
 
     private void assertNodeExists(CuratorFramework curator, String path) 
throws Exception
     {

Reply via email to