[
https://issues.apache.org/jira/browse/CURATOR-623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zili Chen resolved CURATOR-623.
-------------------------------
Resolution: Fixed
master via d58aa73f4cc78325abb2ae0ffd95fd9cf0776541
> DistributedQueue stops filling after long disconnect from cluster
> -----------------------------------------------------------------
>
> Key: CURATOR-623
> URL: https://issues.apache.org/jira/browse/CURATOR-623
> Project: Apache Curator
> Issue Type: Bug
> Reporter: Никита Соколов
> Assignee: Jordan Zimmerman
> Priority: Major
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> One of our VMs had network down for 12 minutes and after the network was up,
> the queues have stopped being filled by external processes as curator gave up
> on all watchers. Here is a test reproducing the issue:
> {code:java}
> import junit.framework.TestCase;
> import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.CuratorFrameworkFactory;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.framework.recipes.queue.QueueBuilder;
> import org.apache.curator.framework.recipes.queue.QueueConsumer;
> import org.apache.curator.framework.recipes.queue.QueueSerializer;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.retry.ExponentialBackoffRetry;
> import org.apache.curator.test.TestingCluster;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.function.Consumer;
> public class DistributedQueueTest extends TestCase {
> public void test() throws Exception {
> final var done = new CompletableFuture<>();
> try (
> final var testingCluster = started(new TestingCluster(1));
> final var dyingCuratorFramework =
> getCuratorFramework(testingCluster.getConnectString());
> final var dyingQueue = newQueue(dyingCuratorFramework, item -> {
> if (item.equals("0")) {
> done.complete(null);
> }
> })
> ) {
> dyingQueue.start();
>
> testingCluster.killServer(testingCluster.getInstances().iterator().next());
> Thread.sleep(2 * 60_000);
>
> testingCluster.restartServer(testingCluster.getInstances().iterator().next());
> try (
> final var aliveCuratorFramework =
> getCuratorFramework(testingCluster.getConnectString());
> final var aliveQueue = newQueue(aliveCuratorFramework, __ ->
> {})
> ) {
> aliveQueue.start();
> aliveQueue.put("0");
> done.get(1, TimeUnit.MINUTES);
> }
> }
> }
> private static DistributedQueue<String> newQueue(CuratorFramework
> curatorFramework, Consumer<String> consumer) {
> curatorFramework.start();
> return QueueBuilder.builder(
> curatorFramework,
> new QueueConsumer<String>() {
> @Override
> public void consumeMessage(String o) {
> consumer.accept(o);
> }
> @Override
> public void stateChanged(CuratorFramework curatorFramework,
> ConnectionState connectionState) {
> }
> },
> new QueueSerializer<>() {
> @Override
> public byte[] serialize(String item) {
> return item.getBytes();
> }
> @Override
> public String deserialize(byte[] bytes) {
> return new String(bytes);
> }
> },
> "/MyChildrenCacheTest/queue"
> ).buildQueue();
> }
> private static TestingCluster started(TestingCluster testingCluster)
> throws Exception {
> try {
> testingCluster.start();
> return testingCluster;
> } catch (Throwable throwable) {
> try (testingCluster) {
> throw throwable;
> }
> }
> }
> private static CuratorFramework getCuratorFramework(String connectString)
> {
> return CuratorFrameworkFactory.builder()
> .ensembleProvider(new FixedEnsembleProvider(connectString, true))
> .retryPolicy(new ExponentialBackoffRetry(1000, 3))
> .build();
> }
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)