Repository: curator Updated Branches: refs/heads/CURATOR-3.0 850b6e4e4 -> 6d41d4a80
Using only Stat version to note node change (for UPDATE event) is not enough. In a split brain if a node delete plus a recreate is missed, the Stat version can appear to be the same. So, when data is available in the cache, compare the data not the stat version Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/127efd5e Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/127efd5e Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/127efd5e Branch: refs/heads/CURATOR-3.0 Commit: 127efd5ee1fc186b5027717119c953c85267f506 Parents: caf6406 Author: randgalt <randg...@apache.org> Authored: Thu Mar 9 09:50:11 2017 -0800 Committer: randgalt <randg...@apache.org> Committed: Thu Mar 9 09:50:21 2017 -0800 ---------------------------------------------------------------------- .../recipes/cache/PathChildrenCache.java | 12 +++- .../cache/TestPathChildrenCacheInCluster.java | 60 ++++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/127efd5e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java index 568d03d..c39b2c7 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java @@ -45,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -694,7 +695,7 @@ public class PathChildrenCache implements Closeable { offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data))); } - else if ( previousData.getStat().getVersion() != stat.getVersion() ) + else if ( hasChanged(stat, previousData, data) ) { offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data))); } @@ -702,6 +703,15 @@ public class PathChildrenCache implements Closeable } } + private boolean hasChanged(Stat stat, ChildData previousData, ChildData newData) + { + if ( cacheData ) + { + return !Arrays.equals(previousData.getData(), newData.getData()); + } + return previousData.getStat().getVersion() != stat.getVersion(); + } + private void updateInitialSet(String name, ChildData data) { Map<String, ChildData> localInitialSet = initialSet.get(); http://git-wip-us.apache.org/repos/asf/curator/blob/127efd5e/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java index 1144ede..b3abca9 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCacheInCluster.java @@ -18,6 +18,7 @@ */ package org.apache.curator.framework.recipes.cache; +import com.google.common.collect.Queues; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -27,6 +28,7 @@ import org.apache.curator.test.TestingCluster; import org.apache.curator.test.Timing; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -34,6 +36,64 @@ import java.util.concurrent.atomic.AtomicReference; public class TestPathChildrenCacheInCluster { @Test + public void testMissedDelete() throws Exception + { + Timing timing = new Timing(); + PathChildrenCache cache = null; + CuratorFramework client1 = null; + CuratorFramework client2 = null; + TestingCluster cluster = new TestingCluster(3); + try + { + cluster.start(); + + // client 1 only connects to 1 server + InstanceSpec client1Instance = cluster.getInstances().iterator().next(); + client1 = CuratorFrameworkFactory.newClient(client1Instance.getConnectString(), 1000, 1000, new RetryOneTime(1)); + cache = new PathChildrenCache(client1, "/test", true); + final BlockingQueue<PathChildrenCacheEvent.Type> events = Queues.newLinkedBlockingQueue(); + PathChildrenCacheListener listener = new PathChildrenCacheListener() + { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + { + events.add(event.getType()); + } + }; + cache.getListenable().addListener(listener); + + client2 = CuratorFrameworkFactory.newClient(cluster.getConnectString(), 1000, 1000, new RetryOneTime(1)); + + client1.start(); + client2.start(); + cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); + Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED); + Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.INITIALIZED); + + client2.create().creatingParentsIfNeeded().forPath("/test/node", "first".getBytes()); + Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_ADDED); + + cluster.killServer(client1Instance); + Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED); + Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_LOST); + + client2.delete().forPath("/test/node"); + client2.create().forPath("/test/node", "second".getBytes()); + cluster.restartServer(client1Instance); + + Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED); + Assert.assertEquals(events.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), PathChildrenCacheEvent.Type.CHILD_UPDATED); // "/test/node" is different - should register as updated + } + finally + { + CloseableUtils.closeQuietly(client1); + CloseableUtils.closeQuietly(client2); + CloseableUtils.closeQuietly(cache); + CloseableUtils.closeQuietly(cluster); + } + } + + @Test public void testServerLoss() throws Exception { Timing timing = new Timing();