[ https://issues.apache.org/jira/browse/CURATOR-623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17443319#comment-17443319 ]
Никита Соколов commented on CURATOR-623: ---------------------------------------- You are right about the test being faulty, but the problem is still real: I have made aliveQueue producer-only and the test still fails. This synthetic test does not exactly reproduce what we were trying to achieve in production, but is staggering with the same problem: ChildrenCache watchers not reconnecting after a long period of server being unreachable. {code:java} import junit.framework.TestCase; import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.queue.DistributedQueue; import org.apache.curator.framework.recipes.queue.QueueBuilder; import org.apache.curator.framework.recipes.queue.QueueConsumer; import org.apache.curator.framework.recipes.queue.QueueSerializer; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; public class DistributedQueueTest extends TestCase { public void test() throws Exception { final var done = new CompletableFuture<>(); try ( final var testingCluster = started(new TestingCluster(1)); final var dyingCuratorFramework = getCuratorFramework(testingCluster.getConnectString()); final var dyingQueue = newQueue(dyingCuratorFramework, item -> { if (item.equals("0")) { done.complete(null); } }) ) { dyingQueue.start(); testingCluster.killServer(testingCluster.getInstances().iterator().next()); Thread.sleep(2 * 60_000); testingCluster.restartServer(testingCluster.getInstances().iterator().next()); try ( final var aliveCuratorFramework = getCuratorFramework(testingCluster.getConnectString()); final var aliveQueue = newQueue(aliveCuratorFramework, null) ) { aliveQueue.start(); aliveQueue.put("0"); done.get(1, TimeUnit.MINUTES); } } } private static DistributedQueue<String> newQueue(CuratorFramework curatorFramework, Consumer<String> consumer) { curatorFramework.start(); return QueueBuilder.builder( curatorFramework, consumer == null ? null : new QueueConsumer<String>() { @Override public void consumeMessage(String o) { consumer.accept(o); } @Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { } }, new QueueSerializer<>() { @Override public byte[] serialize(String item) { return item.getBytes(); } @Override public String deserialize(byte[] bytes) { return new String(bytes); } }, "/MyChildrenCacheTest/queue" ).buildQueue(); } private static TestingCluster started(TestingCluster testingCluster) throws Exception { try { testingCluster.start(); return testingCluster; } catch (Throwable throwable) { try (testingCluster) { throw throwable; } } } private static CuratorFramework getCuratorFramework(String connectString) { return CuratorFrameworkFactory.builder() .ensembleProvider(new FixedEnsembleProvider(connectString, true)) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); } } {code} > DistributedQueue stops filling after long disconnect from cluster > ----------------------------------------------------------------- > > Key: CURATOR-623 > URL: https://issues.apache.org/jira/browse/CURATOR-623 > Project: Apache Curator > Issue Type: Bug > Reporter: Никита Соколов > Assignee: Jordan Zimmerman > Priority: Major > > One of our VMs had network down for 12 minutes and after the network was up, > the queues have stopped being filled by external processes as curator gave up > on all watchers. Here is a test reproducing the issue: > {code:java} > import junit.framework.TestCase; > import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; > import org.apache.curator.framework.CuratorFramework; > import org.apache.curator.framework.CuratorFrameworkFactory; > import org.apache.curator.framework.recipes.queue.DistributedQueue; > import org.apache.curator.framework.recipes.queue.QueueBuilder; > import org.apache.curator.framework.recipes.queue.QueueConsumer; > import org.apache.curator.framework.recipes.queue.QueueSerializer; > import org.apache.curator.framework.state.ConnectionState; > import org.apache.curator.retry.ExponentialBackoffRetry; > import org.apache.curator.test.TestingCluster; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.TimeUnit; > import java.util.function.Consumer; > public class DistributedQueueTest extends TestCase { > public void test() throws Exception { > final var done = new CompletableFuture<>(); > try ( > final var testingCluster = started(new TestingCluster(1)); > final var dyingCuratorFramework = > getCuratorFramework(testingCluster.getConnectString()); > final var dyingQueue = newQueue(dyingCuratorFramework, item -> { > if (item.equals("0")) { > done.complete(null); > } > }) > ) { > dyingQueue.start(); > > testingCluster.killServer(testingCluster.getInstances().iterator().next()); > Thread.sleep(2 * 60_000); > > testingCluster.restartServer(testingCluster.getInstances().iterator().next()); > try ( > final var aliveCuratorFramework = > getCuratorFramework(testingCluster.getConnectString()); > final var aliveQueue = newQueue(aliveCuratorFramework, __ -> > {}) > ) { > aliveQueue.start(); > aliveQueue.put("0"); > done.get(1, TimeUnit.MINUTES); > } > } > } > private static DistributedQueue<String> newQueue(CuratorFramework > curatorFramework, Consumer<String> consumer) { > curatorFramework.start(); > return QueueBuilder.builder( > curatorFramework, > new QueueConsumer<String>() { > @Override > public void consumeMessage(String o) { > consumer.accept(o); > } > @Override > public void stateChanged(CuratorFramework curatorFramework, > ConnectionState connectionState) { > } > }, > new QueueSerializer<>() { > @Override > public byte[] serialize(String item) { > return item.getBytes(); > } > @Override > public String deserialize(byte[] bytes) { > return new String(bytes); > } > }, > "/MyChildrenCacheTest/queue" > ).buildQueue(); > } > private static TestingCluster started(TestingCluster testingCluster) > throws Exception { > try { > testingCluster.start(); > return testingCluster; > } catch (Throwable throwable) { > try (testingCluster) { > throw throwable; > } > } > } > private static CuratorFramework getCuratorFramework(String connectString) > { > return CuratorFrameworkFactory.builder() > .ensembleProvider(new FixedEnsembleProvider(connectString, true)) > .retryPolicy(new ExponentialBackoffRetry(1000, 3)) > .build(); > } > } {code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)