[
https://issues.apache.org/jira/browse/CURATOR-623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17443319#comment-17443319
]
Никита Соколов edited comment on CURATOR-623 at 11/14/21, 11:37 AM:
--------------------------------------------------------------------
You are right about the test being faulty, but the problem is still real: I
have made aliveQueue producer-only and the test still fails. This synthetic
test does not exactly reproduce what we were trying to achieve in production,
but is suffering from the same problem: ChildrenCache watchers not reconnecting
after a long period of server being unreachable.
{code:java}
import junit.framework.TestCase;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class DistributedQueueTest extends TestCase {
public void test() throws Exception {
final var done = new CompletableFuture<>();
try (
final var testingCluster = started(new TestingCluster(1));
final var dyingCuratorFramework =
getCuratorFramework(testingCluster.getConnectString());
final var dyingQueue = newQueue(dyingCuratorFramework, item -> {
if (item.equals("0")) {
done.complete(null);
}
})
) {
dyingQueue.start();
testingCluster.killServer(testingCluster.getInstances().iterator().next());
Thread.sleep(2 * 60_000);
testingCluster.restartServer(testingCluster.getInstances().iterator().next());
try (
final var aliveCuratorFramework =
getCuratorFramework(testingCluster.getConnectString());
final var aliveQueue = newQueue(aliveCuratorFramework, null)
) {
aliveQueue.start();
aliveQueue.put("0");
done.get(1, TimeUnit.MINUTES);
}
}
}
private static DistributedQueue<String> newQueue(CuratorFramework
curatorFramework, Consumer<String> consumer) {
curatorFramework.start();
return QueueBuilder.builder(
curatorFramework,
consumer == null ? null : new QueueConsumer<String>() {
@Override
public void consumeMessage(String o) {
consumer.accept(o);
}
@Override
public void stateChanged(CuratorFramework curatorFramework,
ConnectionState connectionState) {
}
},
new QueueSerializer<>() {
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
},
"/MyChildrenCacheTest/queue"
).buildQueue();
}
private static TestingCluster started(TestingCluster testingCluster) throws
Exception {
try {
testingCluster.start();
return testingCluster;
} catch (Throwable throwable) {
try (testingCluster) {
throw throwable;
}
}
}
private static CuratorFramework getCuratorFramework(String connectString) {
return CuratorFrameworkFactory.builder()
.ensembleProvider(new FixedEnsembleProvider(connectString, true))
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
}
} {code}
was (Author: faucct):
You are right about the test being faulty, but the problem is still real: I
have made aliveQueue producer-only and the test still fails. This synthetic
test does not exactly reproduce what we were trying to achieve in production,
but is staggering with the same problem: ChildrenCache watchers not
reconnecting after a long period of server being unreachable.
{code:java}
import junit.framework.TestCase;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class DistributedQueueTest extends TestCase {
public void test() throws Exception {
final var done = new CompletableFuture<>();
try (
final var testingCluster = started(new TestingCluster(1));
final var dyingCuratorFramework =
getCuratorFramework(testingCluster.getConnectString());
final var dyingQueue = newQueue(dyingCuratorFramework, item -> {
if (item.equals("0")) {
done.complete(null);
}
})
) {
dyingQueue.start();
testingCluster.killServer(testingCluster.getInstances().iterator().next());
Thread.sleep(2 * 60_000);
testingCluster.restartServer(testingCluster.getInstances().iterator().next());
try (
final var aliveCuratorFramework =
getCuratorFramework(testingCluster.getConnectString());
final var aliveQueue = newQueue(aliveCuratorFramework, null)
) {
aliveQueue.start();
aliveQueue.put("0");
done.get(1, TimeUnit.MINUTES);
}
}
}
private static DistributedQueue<String> newQueue(CuratorFramework
curatorFramework, Consumer<String> consumer) {
curatorFramework.start();
return QueueBuilder.builder(
curatorFramework,
consumer == null ? null : new QueueConsumer<String>() {
@Override
public void consumeMessage(String o) {
consumer.accept(o);
}
@Override
public void stateChanged(CuratorFramework curatorFramework,
ConnectionState connectionState) {
}
},
new QueueSerializer<>() {
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
},
"/MyChildrenCacheTest/queue"
).buildQueue();
}
private static TestingCluster started(TestingCluster testingCluster) throws
Exception {
try {
testingCluster.start();
return testingCluster;
} catch (Throwable throwable) {
try (testingCluster) {
throw throwable;
}
}
}
private static CuratorFramework getCuratorFramework(String connectString) {
return CuratorFrameworkFactory.builder()
.ensembleProvider(new FixedEnsembleProvider(connectString, true))
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
}
} {code}
> DistributedQueue stops filling after long disconnect from cluster
> -----------------------------------------------------------------
>
> Key: CURATOR-623
> URL: https://issues.apache.org/jira/browse/CURATOR-623
> Project: Apache Curator
> Issue Type: Bug
> Reporter: Никита Соколов
> Assignee: Jordan Zimmerman
> Priority: Major
>
> One of our VMs had network down for 12 minutes and after the network was up,
> the queues have stopped being filled by external processes as curator gave up
> on all watchers. Here is a test reproducing the issue:
> {code:java}
> import junit.framework.TestCase;
> import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.CuratorFrameworkFactory;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.framework.recipes.queue.QueueBuilder;
> import org.apache.curator.framework.recipes.queue.QueueConsumer;
> import org.apache.curator.framework.recipes.queue.QueueSerializer;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.retry.ExponentialBackoffRetry;
> import org.apache.curator.test.TestingCluster;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.function.Consumer;
> public class DistributedQueueTest extends TestCase {
> public void test() throws Exception {
> final var done = new CompletableFuture<>();
> try (
> final var testingCluster = started(new TestingCluster(1));
> final var dyingCuratorFramework =
> getCuratorFramework(testingCluster.getConnectString());
> final var dyingQueue = newQueue(dyingCuratorFramework, item -> {
> if (item.equals("0")) {
> done.complete(null);
> }
> })
> ) {
> dyingQueue.start();
>
> testingCluster.killServer(testingCluster.getInstances().iterator().next());
> Thread.sleep(2 * 60_000);
>
> testingCluster.restartServer(testingCluster.getInstances().iterator().next());
> try (
> final var aliveCuratorFramework =
> getCuratorFramework(testingCluster.getConnectString());
> final var aliveQueue = newQueue(aliveCuratorFramework, __ ->
> {})
> ) {
> aliveQueue.start();
> aliveQueue.put("0");
> done.get(1, TimeUnit.MINUTES);
> }
> }
> }
> private static DistributedQueue<String> newQueue(CuratorFramework
> curatorFramework, Consumer<String> consumer) {
> curatorFramework.start();
> return QueueBuilder.builder(
> curatorFramework,
> new QueueConsumer<String>() {
> @Override
> public void consumeMessage(String o) {
> consumer.accept(o);
> }
> @Override
> public void stateChanged(CuratorFramework curatorFramework,
> ConnectionState connectionState) {
> }
> },
> new QueueSerializer<>() {
> @Override
> public byte[] serialize(String item) {
> return item.getBytes();
> }
> @Override
> public String deserialize(byte[] bytes) {
> return new String(bytes);
> }
> },
> "/MyChildrenCacheTest/queue"
> ).buildQueue();
> }
> private static TestingCluster started(TestingCluster testingCluster)
> throws Exception {
> try {
> testingCluster.start();
> return testingCluster;
> } catch (Throwable throwable) {
> try (testingCluster) {
> throw throwable;
> }
> }
> }
> private static CuratorFramework getCuratorFramework(String connectString)
> {
> return CuratorFrameworkFactory.builder()
> .ensembleProvider(new FixedEnsembleProvider(connectString, true))
> .retryPolicy(new ExponentialBackoffRetry(1000, 3))
> .build();
> }
> } {code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)