[ https://issues.apache.org/jira/browse/CURATOR-623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17443336#comment-17443336 ]
Jordan Zimmerman commented on CURATOR-623: ------------------------------------------ cc [~eolivelli] and [~cammckenzie] for releasing/review > DistributedQueue stops filling after long disconnect from cluster > ----------------------------------------------------------------- > > Key: CURATOR-623 > URL: https://issues.apache.org/jira/browse/CURATOR-623 > Project: Apache Curator > Issue Type: Bug > Reporter: Никита Соколов > Assignee: Jordan Zimmerman > Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > One of our VMs had network down for 12 minutes and after the network was up, > the queues have stopped being filled by external processes as curator gave up > on all watchers. Here is a test reproducing the issue: > {code:java} > import junit.framework.TestCase; > import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; > import org.apache.curator.framework.CuratorFramework; > import org.apache.curator.framework.CuratorFrameworkFactory; > import org.apache.curator.framework.recipes.queue.DistributedQueue; > import org.apache.curator.framework.recipes.queue.QueueBuilder; > import org.apache.curator.framework.recipes.queue.QueueConsumer; > import org.apache.curator.framework.recipes.queue.QueueSerializer; > import org.apache.curator.framework.state.ConnectionState; > import org.apache.curator.retry.ExponentialBackoffRetry; > import org.apache.curator.test.TestingCluster; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.TimeUnit; > import java.util.function.Consumer; > public class DistributedQueueTest extends TestCase { > public void test() throws Exception { > final var done = new CompletableFuture<>(); > try ( > final var testingCluster = started(new TestingCluster(1)); > final var dyingCuratorFramework = > getCuratorFramework(testingCluster.getConnectString()); > final var dyingQueue = newQueue(dyingCuratorFramework, item -> { > if (item.equals("0")) { > done.complete(null); > } > }) > ) { > dyingQueue.start(); > > testingCluster.killServer(testingCluster.getInstances().iterator().next()); > Thread.sleep(2 * 60_000); > > testingCluster.restartServer(testingCluster.getInstances().iterator().next()); > try ( > final var aliveCuratorFramework = > getCuratorFramework(testingCluster.getConnectString()); > final var aliveQueue = newQueue(aliveCuratorFramework, __ -> > {}) > ) { > aliveQueue.start(); > aliveQueue.put("0"); > done.get(1, TimeUnit.MINUTES); > } > } > } > private static DistributedQueue<String> newQueue(CuratorFramework > curatorFramework, Consumer<String> consumer) { > curatorFramework.start(); > return QueueBuilder.builder( > curatorFramework, > new QueueConsumer<String>() { > @Override > public void consumeMessage(String o) { > consumer.accept(o); > } > @Override > public void stateChanged(CuratorFramework curatorFramework, > ConnectionState connectionState) { > } > }, > new QueueSerializer<>() { > @Override > public byte[] serialize(String item) { > return item.getBytes(); > } > @Override > public String deserialize(byte[] bytes) { > return new String(bytes); > } > }, > "/MyChildrenCacheTest/queue" > ).buildQueue(); > } > private static TestingCluster started(TestingCluster testingCluster) > throws Exception { > try { > testingCluster.start(); > return testingCluster; > } catch (Throwable throwable) { > try (testingCluster) { > throw throwable; > } > } > } > private static CuratorFramework getCuratorFramework(String connectString) > { > return CuratorFrameworkFactory.builder() > .ensembleProvider(new FixedEnsembleProvider(connectString, true)) > .retryPolicy(new ExponentialBackoffRetry(1000, 3)) > .build(); > } > } {code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)