This is an automated email from the ASF dual-hosted git repository. randgalt pushed a commit to branch CURATOR-623 in repository https://gitbox.apache.org/repos/asf/curator.git
commit 5c906401e4575605ae4d83c99b8277d62fc9a885 Author: randgalt <randg...@apache.org> AuthorDate: Sun Nov 14 13:00:51 2021 +0000 `ChildrenCache` (used by Queues) didn't have a `ConnectionStateListener`. Thus, if a long network partition occurred the ZK instance would be recreated losing any set watcher and the ChildrenCache would fail to continue watching changes. Adding a ConnectionStateListener fixes this. --- .../framework/recipes/queue/ChildrenCache.java | 34 +++++--- .../recipes/queue/TestLongNetworkPartition.java | 98 ++++++++++++++++++++++ 2 files changed, 120 insertions(+), 12 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java index e5c7e8c..a28a1cc 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java @@ -25,6 +25,9 @@ import org.apache.curator.framework.WatcherRemoveCuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.PathUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import java.io.Closeable; @@ -33,7 +36,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.apache.curator.utils.PathUtils; class ChildrenCache implements Closeable { @@ -49,7 +51,7 @@ class ChildrenCache implements Closeable { if ( !isClosed.get() ) { - sync(true); + sync(); } } }; @@ -66,6 +68,19 @@ class ChildrenCache implements Closeable } }; + private final ConnectionStateListener connectionStateListener = (__, newState) -> { + if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) { + try + { + sync(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } + }; + static class Data { final List<String> children; @@ -86,13 +101,15 @@ class ChildrenCache implements Closeable void start() throws Exception { - sync(true); + client.getConnectionStateListenable().addListener(connectionStateListener); + sync(); } @Override public void close() throws IOException { client.removeWatchers(); + client.getConnectionStateListenable().removeListener(connectionStateListener); isClosed.set(true); notifyFromCallback(); } @@ -137,16 +154,9 @@ class ChildrenCache implements Closeable notifyAll(); } - private synchronized void sync(boolean watched) throws Exception + private synchronized void sync() throws Exception { - if ( watched ) - { - client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path); - } - else - { - client.getChildren().inBackground(callback).forPath(path); - } + client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path); } private synchronized void setNewChildren(List<String> newChildren) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java new file mode 100644 index 0000000..2e3c7a5 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.queue; + +import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.compatibility.Timing2; +import org.junit.jupiter.api.Test; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +public class TestLongNetworkPartition { + private static final Timing2 timing = new Timing2(); + + // test for https://issues.apache.org/jira/browse/CURATOR-623 + @Test + public void testLongNetworkPartition() throws Exception { + final CompletableFuture<Void> done = new CompletableFuture<>(); + try (final TestingCluster testingCluster = started(new TestingCluster(1)); + final CuratorFramework dyingCuratorFramework = getCuratorFramework(testingCluster.getConnectString()); + final DistributedQueue<String> dyingQueue = newQueue(dyingCuratorFramework, item -> { + if ( item.equals("0") ) + { + done.complete(null); + } + })) + { + dyingQueue.start(); + testingCluster.killServer(testingCluster.getInstances().iterator().next()); + timing.forSessionSleep().multiple(2).sleep(); + testingCluster.restartServer(testingCluster.getInstances().iterator().next()); + try (final CuratorFramework aliveCuratorFramework = getCuratorFramework(testingCluster.getConnectString()); + final DistributedQueue<String> aliveQueue = newQueue(aliveCuratorFramework, null)) + { + aliveQueue.start(); + aliveQueue.put("0"); + done.get(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS); + } + } + } + + private static DistributedQueue<String> newQueue(CuratorFramework curatorFramework, Consumer<String> consumer) { + curatorFramework.start(); + return QueueBuilder.builder(curatorFramework, consumer == null ? null : new QueueConsumer<String>() { + @Override + public void consumeMessage(String o) { + consumer.accept(o); + } + + @Override + public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { + } + }, new QueueSerializer<String>() { + @Override + public byte[] serialize(String item) { + return item.getBytes(); + } + + @Override + public String deserialize(byte[] bytes) { + return new String(bytes); + } + }, "/MyChildrenCacheTest/queue").buildQueue(); + } + + private static TestingCluster started(TestingCluster testingCluster) throws Exception { + testingCluster.start(); + return testingCluster; + } + + private static CuratorFramework getCuratorFramework(String connectString) { + return CuratorFrameworkFactory.builder() + .ensembleProvider(new FixedEnsembleProvider(connectString, true)) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + } +} \ No newline at end of file