Repository: curator
Updated Branches:
  refs/heads/CURATOR-3.0 850b6e4e4 -> 6d41d4a80


Using only Stat version to note node change (for UPDATE event) is not enough. 
In a split brain if a node delete plus a recreate is missed, the Stat version 
can appear to be the same. So, when data is available in the cache, compare the 
data not the stat version


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/127efd5e
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/127efd5e
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/127efd5e

Branch: refs/heads/CURATOR-3.0
Commit: 127efd5ee1fc186b5027717119c953c85267f506
Parents: caf6406
Author: randgalt <randg...@apache.org>
Authored: Thu Mar 9 09:50:11 2017 -0800
Committer: randgalt <randg...@apache.org>
Committed: Thu Mar 9 09:50:21 2017 -0800

----------------------------------------------------------------------
 .../recipes/cache/PathChildrenCache.java        | 12 +++-
 .../cache/TestPathChildrenCacheInCluster.java   | 60 ++++++++++++++++++++
 2 files changed, 71 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/127efd5e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 568d03d..c39b2c7 100644
--- 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -45,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -694,7 +695,7 @@ public class PathChildrenCache implements Closeable
             {
                 offerOperation(new EventOperation(this, new 
PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data)));
             }
-            else if ( previousData.getStat().getVersion() != stat.getVersion() 
)
+            else if ( hasChanged(stat, previousData, data) )
             {
                 offerOperation(new EventOperation(this, new 
PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data)));
             }
@@ -702,6 +703,15 @@ public class PathChildrenCache implements Closeable
         }
     }
 
+    private boolean hasChanged(Stat stat, ChildData previousData, ChildData 
newData)
+    {
+        if ( cacheData )
+        {
+            return !Arrays.equals(previousData.getData(), newData.getData());
+        }
+        return previousData.getStat().getVersion() != stat.getVersion();
+    }
+
     private void updateInitialSet(String name, ChildData data)
     {
         Map<String, ChildData> localInitialSet = initialSet.get();

http://git-wip-us.apache.org/repos/asf/curator/blob/127efd5e/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
index 1144ede..b3abca9 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.recipes.cache;
 
+import com.google.common.collect.Queues;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -27,6 +28,7 @@ import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.Timing;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -34,6 +36,64 @@ import java.util.concurrent.atomic.AtomicReference;
 public class TestPathChildrenCacheInCluster
 {
     @Test
+    public void testMissedDelete() throws Exception
+    {
+        Timing timing = new Timing();
+        PathChildrenCache cache = null;
+        CuratorFramework client1 = null;
+        CuratorFramework client2 = null;
+        TestingCluster cluster = new TestingCluster(3);
+        try
+        {
+            cluster.start();
+
+            // client 1 only connects to 1 server
+            InstanceSpec client1Instance = 
cluster.getInstances().iterator().next();
+            client1 = 
CuratorFrameworkFactory.newClient(client1Instance.getConnectString(), 1000, 
1000, new RetryOneTime(1));
+            cache = new PathChildrenCache(client1, "/test", true);
+            final BlockingQueue<PathChildrenCacheEvent.Type> events = 
Queues.newLinkedBlockingQueue();
+            PathChildrenCacheListener listener = new 
PathChildrenCacheListener()
+            {
+                @Override
+                public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event) throws Exception
+                {
+                    events.add(event.getType());
+                }
+            };
+            cache.getListenable().addListener(listener);
+
+            client2 = 
CuratorFrameworkFactory.newClient(cluster.getConnectString(), 1000, 1000, new 
RetryOneTime(1));
+
+            client1.start();
+            client2.start();
+            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+            Assert.assertEquals(events.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED);
+            Assert.assertEquals(events.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.INITIALIZED);
+
+            client2.create().creatingParentsIfNeeded().forPath("/test/node", 
"first".getBytes());
+            Assert.assertEquals(events.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED);
+
+            cluster.killServer(client1Instance);
+            Assert.assertEquals(events.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED);
+            Assert.assertEquals(events.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_LOST);
+
+            client2.delete().forPath("/test/node");
+            client2.create().forPath("/test/node", "second".getBytes());
+            cluster.restartServer(client1Instance);
+
+            Assert.assertEquals(events.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED);
+            Assert.assertEquals(events.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED);  // 
"/test/node" is different - should register as updated
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client1);
+            CloseableUtils.closeQuietly(client2);
+            CloseableUtils.closeQuietly(cache);
+            CloseableUtils.closeQuietly(cluster);
+        }
+    }
+
+    @Test
     public void     testServerLoss() throws Exception
     {
         Timing                  timing = new Timing();

Reply via email to