[ 
https://issues.apache.org/jira/browse/CURATOR-623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17443319#comment-17443319
 ] 

Никита Соколов commented on CURATOR-623:
----------------------------------------

You are right about the test being faulty, but the problem is still real: I 
have made aliveQueue producer-only and the test still fails. This synthetic 
test does not exactly reproduce what we were trying to achieve in production, 
but is staggering with the same problem: ChildrenCache watchers not 
reconnecting after a long period of server being unreachable.

 
{code:java}
import junit.framework.TestCase;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class DistributedQueueTest extends TestCase {
    public void test() throws Exception {
        final var done = new CompletableFuture<>();
        try (
            final var testingCluster = started(new TestingCluster(1));
            final var dyingCuratorFramework = 
getCuratorFramework(testingCluster.getConnectString());
            final var dyingQueue = newQueue(dyingCuratorFramework, item -> {
                if (item.equals("0")) {
                    done.complete(null);
                }
            })
        ) {
            dyingQueue.start();
            
testingCluster.killServer(testingCluster.getInstances().iterator().next());
            Thread.sleep(2 * 60_000);
            
testingCluster.restartServer(testingCluster.getInstances().iterator().next());
            try (
                final var aliveCuratorFramework = 
getCuratorFramework(testingCluster.getConnectString());
                final var aliveQueue = newQueue(aliveCuratorFramework, null)
            ) {
                aliveQueue.start();
                aliveQueue.put("0");
                done.get(1, TimeUnit.MINUTES);
            }
        }
    }

    private static DistributedQueue<String> newQueue(CuratorFramework 
curatorFramework, Consumer<String> consumer) {
        curatorFramework.start();
        return QueueBuilder.builder(
            curatorFramework,
            consumer == null ? null : new QueueConsumer<String>() {
                @Override
                public void consumeMessage(String o) {
                    consumer.accept(o);
                }

                @Override
                public void stateChanged(CuratorFramework curatorFramework, 
ConnectionState connectionState) {
                }
            },
            new QueueSerializer<>() {
                @Override
                public byte[] serialize(String item) {
                    return item.getBytes();
                }

                @Override
                public String deserialize(byte[] bytes) {
                    return new String(bytes);
                }
            },
            "/MyChildrenCacheTest/queue"
        ).buildQueue();
    }

    private static TestingCluster started(TestingCluster testingCluster) throws 
Exception {
        try {
            testingCluster.start();
            return testingCluster;
        } catch (Throwable throwable) {
            try (testingCluster) {
                throw throwable;
            }
        }
    }

    private static CuratorFramework getCuratorFramework(String connectString) {
        return CuratorFrameworkFactory.builder()
            .ensembleProvider(new FixedEnsembleProvider(connectString, true))
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
    }
} {code}
 

 

> DistributedQueue stops filling after long disconnect from cluster
> -----------------------------------------------------------------
>
>                 Key: CURATOR-623
>                 URL: https://issues.apache.org/jira/browse/CURATOR-623
>             Project: Apache Curator
>          Issue Type: Bug
>            Reporter: Никита Соколов
>            Assignee: Jordan Zimmerman
>            Priority: Major
>
> One of our VMs had network down for 12 minutes and after the network was up, 
> the queues have stopped being filled by external processes as curator gave up 
> on all watchers. Here is a test reproducing the issue:
> {code:java}
> import junit.framework.TestCase;
> import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.CuratorFrameworkFactory;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.framework.recipes.queue.QueueBuilder;
> import org.apache.curator.framework.recipes.queue.QueueConsumer;
> import org.apache.curator.framework.recipes.queue.QueueSerializer;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.retry.ExponentialBackoffRetry;
> import org.apache.curator.test.TestingCluster;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.function.Consumer;
> public class DistributedQueueTest extends TestCase {
>     public void test() throws Exception {
>         final var done = new CompletableFuture<>();
>         try (
>             final var testingCluster = started(new TestingCluster(1));
>             final var dyingCuratorFramework = 
> getCuratorFramework(testingCluster.getConnectString());
>             final var dyingQueue = newQueue(dyingCuratorFramework, item -> {
>                 if (item.equals("0")) {
>                     done.complete(null);
>                 }
>             })
>         ) {
>             dyingQueue.start();
>             
> testingCluster.killServer(testingCluster.getInstances().iterator().next());
>             Thread.sleep(2 * 60_000);
>             
> testingCluster.restartServer(testingCluster.getInstances().iterator().next());
>             try (
>                 final var aliveCuratorFramework = 
> getCuratorFramework(testingCluster.getConnectString());
>                 final var aliveQueue = newQueue(aliveCuratorFramework, __ -> 
> {})
>             ) {
>                 aliveQueue.start();
>                 aliveQueue.put("0");
>                 done.get(1, TimeUnit.MINUTES);
>             }
>         }
>     }
>     private static DistributedQueue<String> newQueue(CuratorFramework 
> curatorFramework, Consumer<String> consumer) {
>         curatorFramework.start();
>         return QueueBuilder.builder(
>             curatorFramework,
>             new QueueConsumer<String>() {
>                 @Override
>                 public void consumeMessage(String o) {
>                     consumer.accept(o);
>                 }
>                 @Override
>                 public void stateChanged(CuratorFramework curatorFramework, 
> ConnectionState connectionState) {
>                 }
>             },
>             new QueueSerializer<>() {
>                 @Override
>                 public byte[] serialize(String item) {
>                     return item.getBytes();
>                 }
>                 @Override
>                 public String deserialize(byte[] bytes) {
>                     return new String(bytes);
>                 }
>             },
>             "/MyChildrenCacheTest/queue"
>         ).buildQueue();
>     }
>     private static TestingCluster started(TestingCluster testingCluster) 
> throws Exception {
>         try {
>             testingCluster.start();
>             return testingCluster;
>         } catch (Throwable throwable) {
>             try (testingCluster) {
>                 throw throwable;
>             }
>         }
>     }
>     private static CuratorFramework getCuratorFramework(String connectString) 
> {
>         return CuratorFrameworkFactory.builder()
>             .ensembleProvider(new FixedEnsembleProvider(connectString, true))
>             .retryPolicy(new ExponentialBackoffRetry(1000, 3))
>             .build();
>     }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to