Никита Соколов created CURATOR-623:
--------------------------------------

             Summary: DistributedQueue stops filling after long disconnect from 
cluster
                 Key: CURATOR-623
                 URL: https://issues.apache.org/jira/browse/CURATOR-623
             Project: Apache Curator
          Issue Type: Bug
            Reporter: Никита Соколов
            Assignee: Jordan Zimmerman


One of our VMs had network down for 12 minutes and after the network was up, 
the queues have stopped being filled by external processes as curator gave up 
on all watchers. Here is a test reproducing the issue:
{code:java}
import junit.framework.TestCase;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class DistributedQueueTest extends TestCase {
    public void test() throws Exception {
        final var done = new CompletableFuture<>();
        try (
            final var testingCluster = started(new TestingCluster(1));
            final var dyingCuratorFramework = 
getCuratorFramework(testingCluster.getConnectString());
            final var dyingQueue = newQueue(dyingCuratorFramework, item -> {
                if (item.equals("0")) {
                    done.complete(null);
                }
            })
        ) {
            dyingQueue.start();
            
testingCluster.killServer(testingCluster.getInstances().iterator().next());
            Thread.sleep(2 * 60_000);
            
testingCluster.restartServer(testingCluster.getInstances().iterator().next());
            try (
                final var aliveCuratorFramework = 
getCuratorFramework(testingCluster.getConnectString());
                final var aliveQueue = newQueue(aliveCuratorFramework, __ -> {})
            ) {
                aliveQueue.start();
                aliveQueue.put("0");
                done.get(1, TimeUnit.MINUTES);
            }
        }
    }

    private static DistributedQueue<String> newQueue(CuratorFramework 
curatorFramework, Consumer<String> consumer) {
        curatorFramework.start();
        return QueueBuilder.builder(
            curatorFramework,
            new QueueConsumer<String>() {
                @Override
                public void consumeMessage(String o) {
                    consumer.accept(o);
                }

                @Override
                public void stateChanged(CuratorFramework curatorFramework, 
ConnectionState connectionState) {
                }
            },
            new QueueSerializer<>() {
                @Override
                public byte[] serialize(String item) {
                    return item.getBytes();
                }

                @Override
                public String deserialize(byte[] bytes) {
                    return new String(bytes);
                }
            },
            "/MyChildrenCacheTest/queue"
        ).buildQueue();
    }

    private static TestingCluster started(TestingCluster testingCluster) throws 
Exception {
        try {
            testingCluster.start();
            return testingCluster;
        } catch (Throwable throwable) {
            try (testingCluster) {
                throw throwable;
            }
        }
    }

    private static CuratorFramework getCuratorFramework(String connectString) {
        return CuratorFrameworkFactory.builder()
            .ensembleProvider(new FixedEnsembleProvider(connectString, true))
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
    }
} {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to