Support getting at the cached config from the ensemble tracker
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/b89091e9 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/b89091e9 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/b89091e9 Branch: refs/heads/CURATOR-3.0 Commit: b89091e9363e760aa34028bcfb57baf6ca921957 Parents: cb34e6f Author: randgalt <randg...@apache.org> Authored: Fri Oct 2 09:37:13 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Fri Oct 2 09:37:13 2015 -0500 ---------------------------------------------------------------------- .../curator/framework/CuratorFramework.java | 8 ++++++ .../framework/imps/CuratorFrameworkImpl.java | 7 +++++ .../curator/framework/imps/EnsembleTracker.java | 27 +++++++++++++++----- .../curator/framework/imps/NamespaceFacade.java | 7 +++++ .../framework/imps/WatcherRemovalFacade.java | 7 +++++ .../framework/imps/TestReconfiguration.java | 12 ++++----- 6 files changed, 55 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 3d197a0..29c5f06 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -31,6 +31,7 @@ import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.framework.state.ConnectionStateErrorPolicy; import org.apache.curator.utils.EnsurePath; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import java.io.Closeable; import java.util.concurrent.TimeUnit; @@ -305,4 +306,11 @@ public interface CuratorFramework extends Closeable * @return error policy */ public ConnectionStateErrorPolicy getConnectionStateErrorPolicy(); + + /** + * Current maintains a cached view of the Zookeeper quorum config. + * + * @return the current config + */ + public QuorumVerifier getCurrentConfig(); } http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index c3215ad..db18594 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -50,6 +50,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; @@ -171,6 +172,12 @@ public class CuratorFrameworkImpl implements CuratorFramework return new WatcherRemovalFacade(this); } + @Override + public QuorumVerifier getCurrentConfig() + { + return (ensembleTracker != null) ? ensembleTracker.getCurrentConfig() : null; + } + private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory) { return new ZookeeperFactory() http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java index acd01ee..a46fed1 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java @@ -21,6 +21,7 @@ package org.apache.curator.framework.imps; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import org.apache.curator.ensemble.EnsembleProvider; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.WatcherRemoveCuratorFramework; @@ -50,6 +51,7 @@ public class EnsembleTracker implements Closeable, CuratorWatcher private final WatcherRemoveCuratorFramework client; private final EnsembleProvider ensembleProvider; private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final AtomicReference<QuorumMaj> currentConfig = new AtomicReference<>(new QuorumMaj(Maps.<Long, QuorumPeer.QuorumServer>newHashMap())); private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override @@ -108,6 +110,16 @@ public class EnsembleTracker implements Closeable, CuratorWatcher } } + /** + * Return the current quorum config + * + * @return config + */ + public QuorumVerifier getCurrentConfig() + { + return currentConfig.get(); + } + private void reset() throws Exception { BackgroundCallback backgroundCallback = new BackgroundCallback() @@ -125,13 +137,10 @@ public class EnsembleTracker implements Closeable, CuratorWatcher } @VisibleForTesting - public static String configToConnectionString(byte[] data) throws Exception + public static String configToConnectionString(QuorumVerifier data) throws Exception { - Properties properties = new Properties(); - properties.load(new ByteArrayInputStream(data)); - QuorumVerifier qv = new QuorumMaj(properties); StringBuilder sb = new StringBuilder(); - for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() ) + for ( QuorumPeer.QuorumServer server : data.getAllMembers().values() ) { if ( sb.length() != 0 ) { @@ -146,7 +155,13 @@ public class EnsembleTracker implements Closeable, CuratorWatcher private void processConfigData(byte[] data) throws Exception { log.info("New config event received: " + Arrays.toString(data)); - String connectionString = configToConnectionString(data); + + Properties properties = new Properties(); + properties.load(new ByteArrayInputStream(data)); + QuorumMaj newConfig = new QuorumMaj(properties); + currentConfig.set(newConfig); + + String connectionString = configToConnectionString(newConfig); if ( connectionString.trim().length() > 0 ) { ensembleProvider.setConnectionString(connectionString); http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java index 60ef647..9935670 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java @@ -28,6 +28,7 @@ import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.EnsurePath; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; class NamespaceFacade extends CuratorFrameworkImpl { @@ -49,6 +50,12 @@ class NamespaceFacade extends CuratorFrameworkImpl } @Override + public QuorumVerifier getCurrentConfig() + { + return client.getCurrentConfig(); + } + + @Override public CuratorFramework nonNamespaceView() { return usingNamespace(null); http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java index 47c2104..371fc63 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java @@ -30,6 +30,7 @@ import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.EnsurePath; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemoveCuratorFramework { @@ -55,6 +56,12 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove } @Override + public QuorumVerifier getCurrentConfig() + { + return client.getCurrentConfig(); + } + + @Override public void removeWatchers() { removalManager.removeWatchers(); http://git-wip-us.apache.org/repos/asf/curator/blob/b89091e9/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java index e399a4d..7565590 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -45,10 +45,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.net.Inet4Address; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -164,7 +162,7 @@ public class TestReconfiguration extends BaseClassForTests QuorumVerifier quorumVerifier = toQuorumVerifier(configData); System.out.println(quorumVerifier); assertConfig(quorumVerifier, cluster.getInstances()); - Assert.assertEquals(EnsembleTracker.configToConnectionString(configData), ensembleProvider.getConnectionString()); + Assert.assertEquals(EnsembleTracker.configToConnectionString(quorumVerifier), ensembleProvider.getConnectionString()); } } @@ -192,7 +190,7 @@ public class TestReconfiguration extends BaseClassForTests List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances()); newInstances.addAll(newCluster.getInstances()); assertConfig(newConfig, newInstances); - Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString()); + Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString()); } } } @@ -234,7 +232,7 @@ public class TestReconfiguration extends BaseClassForTests List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances()); newInstances.addAll(newCluster.getInstances()); assertConfig(newConfig, newInstances); - Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString()); + Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString()); } } } @@ -275,7 +273,7 @@ public class TestReconfiguration extends BaseClassForTests newInstances.addAll(instances); newInstances.remove(removeSpec); assertConfig(newConfig, newInstances); - Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString()); + Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString()); } } } @@ -311,7 +309,7 @@ public class TestReconfiguration extends BaseClassForTests QuorumVerifier newConfig = toQuorumVerifier(newConfigData); Assert.assertEquals(newConfig.getAllMembers().size(), 3); assertConfig(newConfig, smallCluster); - Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), ensembleProvider.getConnectionString()); + Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString()); } }