When the connection is lost client.getZookeeperClient().getZooKeeper() needs to be called periodically so that the ensemble provider may update the connection string, etc.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/04c16b11 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/04c16b11 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/04c16b11 Branch: refs/heads/CURATOR-455 Commit: 04c16b110207f10475059cff2c0363d2c1c43db3 Parents: f57206d Author: randgalt <randg...@apache.org> Authored: Tue Feb 20 13:36:22 2018 -0500 Committer: randgalt <randg...@apache.org> Committed: Tue Feb 20 13:36:22 2018 -0500 ---------------------------------------------------------------------- .../curator/ensemble/TestEnsembleProvider.java | 106 ++++++++++++ .../framework/state/ConnectionStateManager.java | 12 ++ .../ensemble/TestEnsembleProvider.java | 162 +++++++++++++++++++ .../apache/curator/test/BaseClassForTests.java | 2 +- 4 files changed, 281 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/04c16b11/curator-client/src/test/java/org/apache/curator/ensemble/TestEnsembleProvider.java ---------------------------------------------------------------------- diff --git a/curator-client/src/test/java/org/apache/curator/ensemble/TestEnsembleProvider.java b/curator-client/src/test/java/org/apache/curator/ensemble/TestEnsembleProvider.java new file mode 100644 index 0000000..36f0fd7 --- /dev/null +++ b/curator-client/src/test/java/org/apache/curator/ensemble/TestEnsembleProvider.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.ensemble; + +import org.apache.curator.CuratorZookeeperClient; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.Semaphore; + +public class TestEnsembleProvider extends BaseClassForTests +{ + private final Timing timing = new Timing(); + + @Test + public void testBasic() throws Exception + { + Semaphore counter = new Semaphore(0); + final CuratorZookeeperClient client = new CuratorZookeeperClient(new CountingEnsembleProvider(counter), timing.session(), timing.connection(), null, new RetryOneTime(2)); + try + { + client.start(); + Assert.assertTrue(timing.acquireSemaphore(counter)); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testAfterSessionExpiration() throws Exception + { + Semaphore counter = new Semaphore(0); + final CuratorZookeeperClient client = new CuratorZookeeperClient(new CountingEnsembleProvider(counter), timing.session(), timing.connection(), null, new RetryOneTime(2)); + try + { + client.start(); + Assert.assertTrue(timing.acquireSemaphore(counter)); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + private class CountingEnsembleProvider implements EnsembleProvider + { + private final Semaphore getConnectionStringCounter; + + public CountingEnsembleProvider(Semaphore getConnectionStringCounter) + { + this.getConnectionStringCounter = getConnectionStringCounter; + } + + @Override + public void start() + { + // NOP + } + + @Override + public String getConnectionString() + { + getConnectionStringCounter.release(); + return server.getConnectString(); + } + + @Override + public void close() + { + // NOP + } + + @Override + public void setConnectionString(String connectionString) + { + // NOP + } + + @Override + public boolean updateServerListEnabled() + { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/04c16b11/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 251baa9..0c8ddf8 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -315,6 +315,18 @@ public class ConnectionStateManager implements Closeable } } } + else if ( currentConnectionState == ConnectionState.LOST ) + { + try + { + // give ConnectionState.checkTimeouts() a chance to run, reset ensemble providers, etc. + client.getZookeeperClient().getZooKeeper(); + } + catch ( Exception e ) + { + log.error("Could not get ZooKeeper", e); + } + } } private void setCurrentConnectionState(ConnectionState newConnectionState) http://git-wip-us.apache.org/repos/asf/curator/blob/04c16b11/curator-framework/src/test/java/org/apache/curator/framework/ensemble/TestEnsembleProvider.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/ensemble/TestEnsembleProvider.java b/curator-framework/src/test/java/org/apache/curator/framework/ensemble/TestEnsembleProvider.java new file mode 100644 index 0000000..73d94a5 --- /dev/null +++ b/curator-framework/src/test/java/org/apache/curator/framework/ensemble/TestEnsembleProvider.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.ensemble; + +import org.apache.curator.ensemble.EnsembleProvider; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.TestingServer; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +public class TestEnsembleProvider extends BaseClassForTests +{ + private final Timing timing = new Timing(); + + @Test + public void testBasic() + { + Semaphore counter = new Semaphore(0); + final CuratorFramework client = newClient(counter); + try + { + client.start(); + Assert.assertTrue(timing.acquireSemaphore(counter)); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test + public void testAfterSessionExpiration() throws Exception + { + TestingServer oldServer = server; + Semaphore counter = new Semaphore(0); + final CuratorFramework client = newClient(counter); + try + { + client.start(); + + final CountDownLatch connectedLatch = new CountDownLatch(1); + final CountDownLatch lostLatch = new CountDownLatch(1); + final CountDownLatch reconnectedLatch = new CountDownLatch(1); + ConnectionStateListener listener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( newState == ConnectionState.CONNECTED ) + { + connectedLatch.countDown(); + } + if ( newState == ConnectionState.LOST ) + { + lostLatch.countDown(); + } + if ( newState == ConnectionState.RECONNECTED ) + { + reconnectedLatch.countDown(); + } + } + }; + client.getConnectionStateListenable().addListener(listener); + Assert.assertTrue(timing.awaitLatch(connectedLatch)); + + server.stop(); + + Assert.assertTrue(timing.awaitLatch(lostLatch)); + counter.drainPermits(); + for ( int i = 0; i < 5; ++i ) + { + // the ensemble provider should still be called periodically when the connection is lost + Assert.assertTrue(timing.acquireSemaphore(counter), "Failed when i is: " + i); + } + + server = new TestingServer(); // this changes the CountingEnsembleProvider's value for getConnectionString() - connection should notice this and recover + Assert.assertTrue(timing.awaitLatch(reconnectedLatch)); + } + finally + { + CloseableUtils.closeQuietly(client); + CloseableUtils.closeQuietly(oldServer); + } + } + + private CuratorFramework newClient(Semaphore counter) + { + return CuratorFrameworkFactory.builder() + .ensembleProvider(new CountingEnsembleProvider(counter)) + .sessionTimeoutMs(timing.session()) + .connectionTimeoutMs(timing.connection()) + .retryPolicy(new RetryOneTime(1)) + .build(); + } + + private class CountingEnsembleProvider implements EnsembleProvider + { + private final Semaphore getConnectionStringCounter; + + public CountingEnsembleProvider(Semaphore getConnectionStringCounter) + { + this.getConnectionStringCounter = getConnectionStringCounter; + } + + @Override + public void start() + { + // NOP + } + + @Override + public String getConnectionString() + { + getConnectionStringCounter.release(); + return server.getConnectString(); + } + + @Override + public void close() + { + // NOP + } + + @Override + public void setConnectionString(String connectionString) + { + // NOP + } + + @Override + public boolean updateServerListEnabled() + { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/04c16b11/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java index 52446df..7c4af65 100644 --- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java +++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class BaseClassForTests { - protected TestingServer server; + protected volatile TestingServer server; private final Logger log = LoggerFactory.getLogger(getClass()); private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;