Repository: curator
Updated Branches:
  refs/heads/CURATOR-266 [created] 26364c618


1. EnsembleTracker should always be on, it now is
2. Removed DynamicEnsembleProvider. This should not be optional. 
EnsembleTracker now always publishes config changes which will end up calling 
ZooKeeper.updateServerList()
3. Testing


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/26364c61
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/26364c61
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/26364c61

Branch: refs/heads/CURATOR-266
Commit: 26364c6186fc7c09a9462557b1ca791e9aa70006
Parents: a7076bc
Author: randgalt <randg...@apache.org>
Authored: Sat Sep 26 13:13:02 2015 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sat Sep 26 13:13:02 2015 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     |  32 +++-
 .../java/org/apache/curator/HandleHolder.java   |   4 +-
 .../ClassicConnectionHandlingPolicy.java        |   4 +-
 .../connection/ConnectionHandlingPolicy.java    |   6 +-
 .../StandardConnectionHandlingPolicy.java       |   4 +-
 .../curator/ensemble/EnsembleListener.java      |  24 ---
 .../curator/ensemble/EnsembleProvider.java      |   2 +
 .../dynamic/DynamicEnsembleProvider.java        |  61 ------
 .../exhibitor/ExhibitorEnsembleProvider.java    |   6 +
 .../ensemble/fixed/FixedEnsembleProvider.java   |  13 +-
 .../framework/ensemble/EnsembleTracker.java     | 191 -------------------
 .../framework/imps/CuratorFrameworkImpl.java    |   7 +
 .../curator/framework/imps/EnsembleTracker.java | 150 +++++++++++++++
 .../framework/imps/TestReconfiguration.java     |  57 +++++-
 .../locks/TestInterProcessSemaphoreCluster.java |   5 +
 15 files changed, 263 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/ConnectionState.java 
b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index eea2ce0..4c1e6ad 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -199,12 +199,14 @@ class ConnectionState implements Watcher, Closeable
 
     private synchronized void checkTimeouts() throws Exception
     {
-        Callable<Boolean> hasNewConnectionString  = new Callable<Boolean>()
+        final AtomicReference<String> newConnectionString = new 
AtomicReference<>();
+        Callable<String> hasNewConnectionString = new Callable<String>()
         {
             @Override
-            public Boolean call()
+            public String call()
             {
-                return zooKeeper.hasNewConnectionString();
+                newConnectionString.set(zooKeeper.getNewConnectionString());
+                return newConnectionString.get();
             }
         };
         int lastNegotiatedSessionTimeoutMs = 
getLastNegotiatedSessionTimeoutMs();
@@ -220,7 +222,7 @@ class ConnectionState implements Watcher, Closeable
 
             case NEW_CONNECTION_STRING:
             {
-                handleNewConnectionString();
+                handleNewConnectionString(newConnectionString.get());
                 break;
             }
 
@@ -298,22 +300,34 @@ class ConnectionState implements Watcher, Closeable
         }
         }
 
-        if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() )
+        if ( checkNewConnectionString )
         {
-            handleNewConnectionString();
+            String newConnectionString = zooKeeper.getNewConnectionString();
+            if ( newConnectionString != null )
+            {
+                handleNewConnectionString(newConnectionString);
+            }
         }
 
         return isConnected;
     }
 
-    private void handleNewConnectionString()
+    private void handleNewConnectionString(String newConnectionString)
     {
-        log.info("Connection string changed");
+        log.info("Connection string changed to: " + newConnectionString);
         tracer.get().addCount("connection-string-changed", 1);
 
         try
         {
-            reset();
+            ZooKeeper zooKeeper = this.zooKeeper.getZooKeeper();
+            if ( zooKeeper == null )
+            {
+                log.warn("Could not update the connection string because 
getZooKeeper() returned null.");
+            }
+            else
+            {
+                zooKeeper.updateServerList(newConnectionString);
+            }
         }
         catch ( Exception e )
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/HandleHolder.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/HandleHolder.java 
b/curator-client/src/main/java/org/apache/curator/HandleHolder.java
index 8652f0c..98b39ce 100644
--- a/curator-client/src/main/java/org/apache/curator/HandleHolder.java
+++ b/curator-client/src/main/java/org/apache/curator/HandleHolder.java
@@ -67,10 +67,10 @@ class HandleHolder
         return (helper != null) ? helper.getConnectionString() : null;
     }
 
-    boolean hasNewConnectionString() 
+    String getNewConnectionString()
     {
         String helperConnectionString = (helper != null) ? 
helper.getConnectionString() : null;
-        return (helperConnectionString != null) && 
!ensembleProvider.getConnectionString().equals(helperConnectionString);
+        return ((helperConnectionString != null) && 
!ensembleProvider.getConnectionString().equals(helperConnectionString)) ? 
helperConnectionString : null;
     }
 
     void closeAndClear() throws Exception

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
 
b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
index 8116308..f620ffb 100644
--- 
a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
+++ 
b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
@@ -56,14 +56,14 @@ public class ClassicConnectionHandlingPolicy implements 
ConnectionHandlingPolicy
     }
 
     @Override
-    public CheckTimeoutsResult checkTimeouts(Callable<Boolean> 
hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int 
connectionTimeoutMs) throws Exception
+    public CheckTimeoutsResult checkTimeouts(Callable<String> 
hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int 
connectionTimeoutMs) throws Exception
     {
         CheckTimeoutsResult result = CheckTimeoutsResult.NOP;
         int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
         long elapsed = System.currentTimeMillis() - connectionStartMs;
         if ( elapsed >= minTimeout )
         {
-            if ( hasNewConnectionString.call() )
+            if ( hasNewConnectionString.call() != null )
             {
                 result = CheckTimeoutsResult.NEW_CONNECTION_STRING;
             }

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
 
b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
index c47577d..8f6a147 100644
--- 
a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
+++ 
b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
@@ -99,13 +99,13 @@ public interface ConnectionHandlingPolicy
      * Check timeouts. NOTE: this method is only called when an attempt to 
access to the ZooKeeper instances
      * is made and the connection has not completed.
      *
-     * @param hasNewConnectionString proc to call to check if there is a new 
connection string. Important: the internal state is cleared after
-     *                               this is called so you MUST handle the new 
connection string if <tt>true</tt> is returned
+     * @param getNewConnectionString proc to call to check if there is a new 
connection string. Important: the internal state is cleared after
+     *                               this is called so you MUST handle the new 
connection string if non null is returned
      * @param connectionStartMs the epoch/ms time that the connection was 
first initiated
      * @param sessionTimeoutMs the configured/negotiated session timeout in 
milliseconds
      * @param connectionTimeoutMs the configured connection timeout in 
milliseconds
      * @return result
      * @throws Exception errors
      */
-    CheckTimeoutsResult checkTimeouts(Callable<Boolean> 
hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int 
connectionTimeoutMs) throws Exception;
+    CheckTimeoutsResult checkTimeouts(Callable<String> getNewConnectionString, 
long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws 
Exception;
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
 
b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
index 9f311de..6995815 100644
--- 
a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
+++ 
b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
@@ -76,9 +76,9 @@ public class StandardConnectionHandlingPolicy implements 
ConnectionHandlingPolic
     }
 
     @Override
-    public CheckTimeoutsResult checkTimeouts(Callable<Boolean> 
hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int 
connectionTimeoutMs) throws Exception
+    public CheckTimeoutsResult checkTimeouts(Callable<String> 
hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int 
connectionTimeoutMs) throws Exception
     {
-        if ( hasNewConnectionString.call() )
+        if ( hasNewConnectionString.call() != null )
         {
             return CheckTimeoutsResult.NEW_CONNECTION_STRING;
         }

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
 
b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
deleted file mode 100644
index 8f963cd..0000000
--- 
a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleListener.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble;
-
-public interface EnsembleListener {
-
-    void connectionStringUpdated(String connectionString);
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
 
b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
index b118294..c03726f 100644
--- 
a/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
+++ 
b/curator-client/src/main/java/org/apache/curator/ensemble/EnsembleProvider.java
@@ -51,4 +51,6 @@ public interface EnsembleProvider extends Closeable
      * @throws IOException errors
      */
     public void         close() throws IOException;
+
+    public void setConnectionString(String connectionString);
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
 
b/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
deleted file mode 100644
index 70b755f..0000000
--- 
a/curator-client/src/main/java/org/apache/curator/ensemble/dynamic/DynamicEnsembleProvider.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.ensemble.dynamic;
-
-import com.google.common.base.Preconditions;
-import org.apache.curator.ensemble.EnsembleListener;
-import org.apache.curator.ensemble.EnsembleProvider;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class DynamicEnsembleProvider implements EnsembleProvider, 
EnsembleListener {
-
-    private final AtomicReference<String> connectionString = new 
AtomicReference<String>();
-
-    /**
-     * The connection string to use
-     *
-     * @param connectionString connection string
-     */
-    public DynamicEnsembleProvider(String connectionString)
-    {
-        this.connectionString.set(Preconditions.checkNotNull(connectionString, 
"connectionString cannot be null"));
-    }
-
-    @Override
-    public void start() throws Exception {
-        // NOP
-    }
-
-    @Override
-    public String getConnectionString() {
-        return connectionString.get();
-    }
-
-    @Override
-    public void close() throws IOException {
-        // NOP
-    }
-
-    @Override
-    public void connectionStringUpdated(String connectionString) {
-        this.connectionString.set(connectionString);
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
 
b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
index 02a01e5..4cbf5ee 100644
--- 
a/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
+++ 
b/curator-client/src/main/java/org/apache/curator/ensemble/exhibitor/ExhibitorEnsembleProvider.java
@@ -145,6 +145,12 @@ public class ExhibitorEnsembleProvider implements 
EnsembleProvider
         return connectionString.get();
     }
 
+    @Override
+    public void setConnectionString(String connectionString)
+    {
+        log.info("setConnectionString received. Ignoring. " + 
connectionString);
+    }
+
     @VisibleForTesting
     protected void poll()
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
 
b/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
index 411c712..159497d 100644
--- 
a/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
+++ 
b/curator-client/src/main/java/org/apache/curator/ensemble/fixed/FixedEnsembleProvider.java
@@ -21,13 +21,14 @@ package org.apache.curator.ensemble.fixed;
 import com.google.common.base.Preconditions;
 import org.apache.curator.ensemble.EnsembleProvider;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Standard ensemble provider that wraps a fixed connection string
  */
 public class FixedEnsembleProvider implements EnsembleProvider
 {
-    private final String connectionString;
+    private final AtomicReference<String> connectionString = new 
AtomicReference<>();
 
     /**
      * The connection string to use
@@ -36,7 +37,7 @@ public class FixedEnsembleProvider implements EnsembleProvider
      */
     public FixedEnsembleProvider(String connectionString)
     {
-        this.connectionString = Preconditions.checkNotNull(connectionString, 
"connectionString cannot be null");
+        this.connectionString.set(Preconditions.checkNotNull(connectionString, 
"connectionString cannot be null"));
     }
 
     @Override
@@ -52,8 +53,14 @@ public class FixedEnsembleProvider implements 
EnsembleProvider
     }
 
     @Override
+    public void setConnectionString(String connectionString)
+    {
+        this.connectionString.set(connectionString);
+    }
+
+    @Override
     public String getConnectionString()
     {
-        return connectionString;
+        return connectionString.get();
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
deleted file mode 100644
index 375e1f0..0000000
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.framework.ensemble;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import org.apache.curator.ensemble.EnsembleListener;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Tracks changes to the ensemble and notifies registered {@link 
org.apache.curator.ensemble.EnsembleListener} instances.
- */
-public class EnsembleTracker implements Closeable
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
-    private final AtomicReference<State> state = new 
AtomicReference<>(State.LATENT);
-    private final ListenerContainer<EnsembleListener> listeners = new 
ListenerContainer<>();
-    private final ConnectionStateListener connectionStateListener = new 
ConnectionStateListener()
-    {
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState 
newState)
-        {
-            if ( (newState == ConnectionState.CONNECTED) || (newState == 
ConnectionState.RECONNECTED) )
-            {
-                try
-                {
-                    reset();
-                }
-                catch ( Exception e )
-                {
-                    log.error("Trying to reset after reconnection", e);
-                }
-            }
-        }
-    };
-
-    private final CuratorWatcher watcher = new CuratorWatcher()
-    {
-        @Override
-        public void process(WatchedEvent event) throws Exception
-        {
-            if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
-            {
-                reset();
-            }
-        }
-    };
-
-    private enum State
-    {
-        LATENT,
-        STARTED,
-        CLOSED
-    }
-
-    private final BackgroundCallback backgroundCallback = new 
BackgroundCallback()
-    {
-        @Override
-        public void processResult(CuratorFramework client, CuratorEvent event) 
throws Exception
-        {
-            processBackgroundResult(event);
-        }
-    };
-
-    public EnsembleTracker(CuratorFramework client)
-    {
-        this.client = client;
-    }
-
-    public void start() throws Exception
-    {
-        Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "Cannot be started more than once");
-        
client.getConnectionStateListenable().addListener(connectionStateListener);
-        reset();
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
-        {
-            listeners.clear();
-        }
-        
client.getConnectionStateListenable().removeListener(connectionStateListener);
-    }
-
-    /**
-     * Return the ensemble listenable
-     *
-     * @return listenable
-     */
-    public ListenerContainer<EnsembleListener> getListenable()
-    {
-        Preconditions.checkState(state.get() != State.CLOSED, "Closed");
-
-        return listeners;
-    }
-
-    private void reset() throws Exception
-    {
-        
client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
-    }
-
-    private void processBackgroundResult(CuratorEvent event) throws Exception
-    {
-        switch ( event.getType() )
-        {
-            case GET_CONFIG:
-            {
-                if ( event.getResultCode() == 
KeeperException.Code.OK.intValue() )
-                {
-                    processConfigData(event.getData());
-                }
-            }
-        }
-    }
-
-    private void processConfigData(byte[] data) throws Exception
-    {
-        Properties properties = new Properties();
-        properties.load(new ByteArrayInputStream(data));
-        QuorumVerifier qv = new QuorumMaj(properties);
-        StringBuilder sb = new StringBuilder();
-        for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() )
-        {
-            if ( sb.length() != 0 )
-            {
-                sb.append(",");
-            }
-            
sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort());
-        }
-
-        final String connectionString = sb.toString();
-        listeners.forEach
-            (
-                new Function<EnsembleListener, Void>()
-                {
-                    @Override
-                    public Void apply(EnsembleListener listener)
-                    {
-                        try
-                        {
-                            listener.connectionStringUpdated(connectionString);
-                        }
-                        catch ( Exception e )
-                        {
-                            log.error("Calling listener", e);
-                        }
-                        return null;
-                    }
-                }
-            );
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index da9067d..f2f578c 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -88,6 +88,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final ConnectionStateErrorPolicy connectionStateErrorPolicy;
     private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
     private final InternalConnectionHandler internalConnectionHandler;
+    private final EnsembleTracker ensembleTracker;
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new 
AtomicBoolean(false);
@@ -150,6 +151,8 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         failedDeleteManager = new FailedDeleteManager(this);
         failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
         namespaceFacadeCache = new NamespaceFacadeCache(this);
+
+        ensembleTracker = new EnsembleTracker(this, 
builder.getEnsembleProvider());
     }
 
     private List<AuthInfo> buildAuths(CuratorFrameworkFactory.Builder builder)
@@ -217,6 +220,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
         connectionStateErrorPolicy = parent.connectionStateErrorPolicy;
         internalConnectionHandler = parent.internalConnectionHandler;
+        ensembleTracker = null;
     }
 
     @Override
@@ -306,6 +310,8 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
                     return null;
                 }
             });
+
+            ensembleTracker.start();
         }
         catch ( Exception e )
         {
@@ -351,6 +357,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
                 }
             }
 
+            ensembleTracker.close();
             listeners.clear();
             unhandledErrorListeners.clear();
             connectionStateManager.close();

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
new file mode 100644
index 0000000..d8092fe
--- /dev/null
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.curator.ensemble.EnsembleProvider;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+@VisibleForTesting
+public class EnsembleTracker implements Closeable
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final EnsembleProvider ensembleProvider;
+    private final AtomicReference<State> state = new 
AtomicReference<>(State.LATENT);
+    private final ConnectionStateListener connectionStateListener = new 
ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState 
newState)
+        {
+            if ( (newState == ConnectionState.CONNECTED) || (newState == 
ConnectionState.RECONNECTED) )
+            {
+                try
+                {
+                    reset();
+                }
+                catch ( Exception e )
+                {
+                    log.error("Trying to reset after reconnection", e);
+                }
+            }
+        }
+    };
+
+    private final CuratorWatcher watcher = new CuratorWatcher()
+    {
+        @Override
+        public void process(WatchedEvent event) throws Exception
+        {
+            if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
+            {
+                reset();
+            }
+        }
+    };
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    EnsembleTracker(CuratorFramework client, EnsembleProvider ensembleProvider)
+    {
+        this.client = client;
+        this.ensembleProvider = ensembleProvider;
+    }
+
+    public void start() throws Exception
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, 
State.STARTED), "Cannot be started more than once");
+        
client.getConnectionStateListenable().addListener(connectionStateListener);
+        reset();
+    }
+
+    @Override
+    public void close()
+    {
+        
client.getConnectionStateListenable().removeListener(connectionStateListener);
+    }
+
+    private void reset() throws Exception
+    {
+        BackgroundCallback backgroundCallback = new BackgroundCallback()
+        {
+            @Override
+            public void processResult(CuratorFramework client, CuratorEvent 
event) throws Exception
+            {
+                if ( event.getType() == CuratorEventType.GET_CONFIG )
+                {
+                    processConfigData(event.getData());
+                }
+            }
+        };
+        
client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble();
+    }
+
+    @VisibleForTesting
+    public static String configToConnectionString(byte[] data) throws Exception
+    {
+        Properties properties = new Properties();
+        properties.load(new ByteArrayInputStream(data));
+        QuorumVerifier qv = new QuorumMaj(properties);
+        StringBuilder sb = new StringBuilder();
+        for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() )
+        {
+            if ( sb.length() != 0 )
+            {
+                sb.append(",");
+            }
+            
sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort());
+        }
+
+        return sb.toString();
+    }
+
+    private void processConfigData(byte[] data) throws Exception
+    {
+        log.info("New config event received: " + Arrays.toString(data));
+        String connectionString = configToConnectionString(data);
+        ensembleProvider.setConnectionString(connectionString);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
index e7d2229..101360a 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java
@@ -20,6 +20,7 @@
 package org.apache.curator.framework.imps;
 
 import com.google.common.collect.Lists;
+import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
@@ -43,17 +44,20 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TestReconfiguration extends BaseClassForTests
 {
     private final Timing timing = new Timing();
     private TestingCluster cluster;
+    private EnsembleProvider ensembleProvider;
 
     @BeforeMethod
     @Override
@@ -72,6 +76,7 @@ public class TestReconfiguration extends BaseClassForTests
     public void teardown() throws Exception
     {
         CloseableUtils.closeQuietly(cluster);
+        ensembleProvider = null;
 
         super.teardown();
     }
@@ -151,9 +156,11 @@ public class TestReconfiguration extends BaseClassForTests
         try ( CuratorFramework client = newClient())
         {
             client.start();
-            QuorumVerifier quorumVerifier = 
toQuorumVerifier(client.getConfig().forEnsemble());
+            byte[] configData = client.getConfig().forEnsemble();
+            QuorumVerifier quorumVerifier = toQuorumVerifier(configData);
             System.out.println(quorumVerifier);
             assertConfig(quorumVerifier, cluster.getInstances());
+            
Assert.assertEquals(EnsembleTracker.configToConnectionString(configData), 
ensembleProvider.getConnectionString());
         }
     }
 
@@ -176,10 +183,12 @@ public class TestReconfiguration extends BaseClassForTests
 
                 Assert.assertTrue(timing.awaitLatch(latch));
 
-                QuorumVerifier newConfig = 
toQuorumVerifier(client.getConfig().forEnsemble());
+                byte[] newConfigData = client.getConfig().forEnsemble();
+                QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
                 List<InstanceSpec> newInstances = 
Lists.newArrayList(cluster.getInstances());
                 newInstances.addAll(newCluster.getInstances());
                 assertConfig(newConfig, newInstances);
+                
Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), 
ensembleProvider.getConnectionString());
             }
         }
     }
@@ -216,10 +225,12 @@ public class TestReconfiguration extends BaseClassForTests
                 Assert.assertTrue(timing.awaitLatch(callbackLatch));
                 Assert.assertTrue(timing.awaitLatch(latch));
 
-                QuorumVerifier newConfig = 
toQuorumVerifier(client.getConfig().forEnsemble());
+                byte[] newConfigData = client.getConfig().forEnsemble();
+                QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
                 List<InstanceSpec> newInstances = 
Lists.newArrayList(cluster.getInstances());
                 newInstances.addAll(newCluster.getInstances());
                 assertConfig(newConfig, newInstances);
+                
Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), 
ensembleProvider.getConnectionString());
             }
         }
     }
@@ -254,11 +265,13 @@ public class TestReconfiguration extends BaseClassForTests
 
                 Assert.assertTrue(timing.awaitLatch(latch));
 
-                QuorumVerifier newConfig = 
toQuorumVerifier(client.getConfig().forEnsemble());
+                byte[] newConfigData = client.getConfig().forEnsemble();
+                QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
                 ArrayList<InstanceSpec> newInstances = 
Lists.newArrayList(oldInstances);
                 newInstances.addAll(instances);
                 newInstances.remove(removeSpec);
                 assertConfig(newConfig, newInstances);
+                
Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), 
ensembleProvider.getConnectionString());
             }
         }
     }
@@ -290,15 +303,47 @@ public class TestReconfiguration extends BaseClassForTests
             
client.reconfig().withNewMembers(toReconfigSpec(smallCluster)).forEnsemble();
 
             Assert.assertTrue(timing.awaitLatch(latch));
-            QuorumVerifier newConfig = 
toQuorumVerifier(client.getConfig().forEnsemble());
+            byte[] newConfigData = client.getConfig().forEnsemble();
+            QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
             Assert.assertEquals(newConfig.getAllMembers().size(), 3);
             assertConfig(newConfig, smallCluster);
+            
Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfigData), 
ensembleProvider.getConnectionString());
         }
     }
 
     private CuratorFramework newClient()
     {
-        return CuratorFrameworkFactory.newClient(cluster.getConnectString(), 
timing.session(), timing.connection(), new 
ExponentialBackoffRetry(timing.forSleepingABit().milliseconds(), 3));
+        final AtomicReference<String> connectString = new 
AtomicReference<>(cluster.getConnectString());
+        ensembleProvider = new EnsembleProvider()
+        {
+            @Override
+            public void start() throws Exception
+            {
+            }
+
+            @Override
+            public String getConnectionString()
+            {
+                return connectString.get();
+            }
+
+            @Override
+            public void close() throws IOException
+            {
+            }
+
+            @Override
+            public void setConnectionString(String connectionString)
+            {
+                connectString.set(connectionString);
+            }
+        };
+        return CuratorFrameworkFactory.builder()
+            .ensembleProvider(ensembleProvider)
+            .sessionTimeoutMs(timing.session())
+            .connectionTimeoutMs(timing.connection())
+            .retryPolicy(new 
ExponentialBackoffRetry(timing.forSleepingABit().milliseconds(), 3))
+            .build();
     }
 
     private CountDownLatch setChangeWaiter(CuratorFramework client) throws 
Exception

http://git-wip-us.apache.org/repos/asf/curator/blob/26364c61/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
index f4cb7bb..ee49288 100644
--- 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
@@ -65,6 +65,11 @@ public class TestInterProcessSemaphoreCluster
             final EnsembleProvider          provider = new EnsembleProvider()
             {
                 @Override
+                public void setConnectionString(String connectionString)
+                {
+                }
+
+                @Override
                 public void start() throws Exception
                 {
                 }

Reply via email to