Continued re-work on the connection state transitions involving 
background/async APIs.

ConnectionStateManager now uses synchronization. This shouldn't hurt 
performance but rationalizes state changes from foreground/background ops.

Background errors now go through same code as foreground errors. Transition to 
LOST is handled specifically instead of generally in logError().


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/75acb0d9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/75acb0d9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/75acb0d9

Branch: refs/heads/master
Commit: 75acb0d9222c3a54c9e15edff319acedcb26bbf5
Parents: a937dfa
Author: randgalt <randg...@apache.org>
Authored: Tue Dec 24 17:54:55 2013 -0500
Committer: randgalt <randg...@apache.org>
Committed: Tue Dec 24 17:54:55 2013 -0500

----------------------------------------------------------------------
 .../java/org/apache/curator/HandleHolder.java   |   2 +-
 .../curator/framework/imps/Backgrounding.java   |   5 +
 .../framework/imps/CuratorFrameworkImpl.java    | 128 +++++++++++------
 .../framework/state/ConnectionStateManager.java |  44 +++++-
 .../framework/client/TestBackgroundStates.java  | 142 +++++++++++++++++++
 5 files changed, 268 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-client/src/main/java/org/apache/curator/HandleHolder.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/HandleHolder.java 
b/curator-client/src/main/java/org/apache/curator/HandleHolder.java
index 4922688..1f7cd91 100644
--- a/curator-client/src/main/java/org/apache/curator/HandleHolder.java
+++ b/curator-client/src/main/java/org/apache/curator/HandleHolder.java
@@ -52,7 +52,7 @@ class HandleHolder
 
     ZooKeeper getZooKeeper() throws Exception
     {
-        return helper.getZooKeeper();
+        return (helper != null) ? helper.getZooKeeper() : null;
     }
 
     String  getConnectionString()

http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
index 6ae9151..262b2a8 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.imps;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.KeeperException;
 import java.util.concurrent.Executor;
 
 class Backgrounding
@@ -109,6 +110,10 @@ class Backgrounding
                             }
                             catch ( Exception e )
                             {
+                                if ( e instanceof KeeperException )
+                                {
+                                    
client.validateConnection(client.codeToState(((KeeperException)e).code()));
+                                }
                                 client.logError("Background operation result 
handling threw exception", e);
                             }
                         }

http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 3aa1097..1b0ef3f 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -512,24 +512,19 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
             log.error(reason, e);
         }
 
-        if ( e instanceof KeeperException.ConnectionLossException )
-        {
-            handleKeeperStateDisconnected();
-        }
-
         final String        localReason = reason;
         unhandledErrorListeners.forEach
-        (
-            new Function<UnhandledErrorListener, Void>()
-            {
-                @Override
-                public Void apply(UnhandledErrorListener listener)
+            (
+                new Function<UnhandledErrorListener, Void>()
                 {
-                    listener.unhandledError(localReason, e);
-                    return null;
+                    @Override
+                    public Void apply(UnhandledErrorListener listener)
+                    {
+                        listener.unhandledError(localReason, e);
+                        return null;
+                    }
                 }
-            }
-        );
+            );
     }
 
     String    unfixForNamespace(String path)
@@ -557,6 +552,73 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         return namespaceWatcherMap;
     }
 
+    void validateConnection(Watcher.Event.KeeperState state)
+    {
+        if ( state == Watcher.Event.KeeperState.Disconnected )
+        {
+            suspendConnection();
+        }
+        else if ( state == Watcher.Event.KeeperState.Expired )
+        {
+            connectionStateManager.addStateChange(ConnectionState.LOST);
+        }
+        else if ( state == Watcher.Event.KeeperState.SyncConnected )
+        {
+            connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
+        }
+        else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
+        {
+            connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
+        }
+    }
+
+    Watcher.Event.KeeperState codeToState(KeeperException.Code code)
+    {
+        switch ( code )
+        {
+        case AUTHFAILED:
+        case NOAUTH:
+        {
+            return Watcher.Event.KeeperState.AuthFailed;
+        }
+
+        case CONNECTIONLOSS:
+        case OPERATIONTIMEOUT:
+        {
+            return Watcher.Event.KeeperState.Disconnected;
+        }
+
+        case SESSIONEXPIRED:
+        {
+            return Watcher.Event.KeeperState.Expired;
+        }
+
+        case OK:
+        case SESSIONMOVED:
+        {
+            return Watcher.Event.KeeperState.SyncConnected;
+        }
+        }
+        return Watcher.Event.KeeperState.fromInt(-1);
+    }
+
+    private void suspendConnection()
+    {
+        connectionStateManager.setToSuspended();
+
+        // we appear to have disconnected, force a new ZK event and see if we 
can connect to another server
+        BackgroundOperation<String> operation = new BackgroundSyncImpl(this, 
null);
+        OperationAndData.ErrorCallback<String> errorCallback = new 
OperationAndData.ErrorCallback<String>()
+        {
+            @Override
+            public void retriesExhausted(OperationAndData<String> 
operationAndData)
+            {
+                connectionStateManager.addStateChange(ConnectionState.LOST);
+            }
+        };
+        performBackgroundOperation(new OperationAndData<String>(operation, 
"/", null, errorCallback, null));
+    }
+
     @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
     private <DATA_TYPE> boolean 
checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent 
event)
     {
@@ -588,8 +650,10 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
             }
             if ( e == null )
             {
-                e = new Exception("Unknown result code: " + 
event.getResultCode());
+                e = new Exception("Unknown result codegetResultCode()");
             }
+
+            validateConnection(codeToState(code));
             logError("Background operation retry gave up", e);
         }
         return doRetry;
@@ -714,7 +778,10 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
 
     private void processEvent(final CuratorEvent curatorEvent)
     {
-        validateConnection(curatorEvent);
+        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
+        {
+            validateConnection(curatorEvent.getWatchedEvent().getState());
+        }
 
         listeners.forEach
         (
@@ -738,33 +805,4 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
             }
         );
     }
-
-    private void validateConnection(CuratorEvent curatorEvent)
-    {
-        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
-        {
-            if ( curatorEvent.getWatchedEvent().getState() == 
Watcher.Event.KeeperState.Disconnected )
-            {
-                handleKeeperStateDisconnected();
-            }
-            else if ( curatorEvent.getWatchedEvent().getState() == 
Watcher.Event.KeeperState.Expired )
-            {
-                connectionStateManager.addStateChange(ConnectionState.LOST);
-            }
-            else if ( curatorEvent.getWatchedEvent().getState() == 
Watcher.Event.KeeperState.SyncConnected )
-            {
-                
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
-            }
-            else if ( curatorEvent.getWatchedEvent().getState() == 
Watcher.Event.KeeperState.ConnectedReadOnly )
-            {
-                
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
-            }
-        }
-    }
-
-    private void handleKeeperStateDisconnected()
-    {
-        connectionStateManager.addStateChange(ConnectionState.SUSPENDED);
-        internalSync(this, "/", null);  // we appear to have disconnected, 
force a new ZK event and see if we can connect to another server
-    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index fe5f18a..a2cfa60 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -65,11 +65,13 @@ public class ConnectionStateManager implements Closeable
     private final BlockingQueue<ConnectionState> eventQueue = new 
ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
     private final CuratorFramework client;
     private final ListenerContainer<ConnectionStateListener> listeners = new 
ListenerContainer<ConnectionStateListener>();
-    private final AtomicReference<ConnectionState> currentState = new 
AtomicReference<ConnectionState>();
     private final AtomicBoolean initialConnectMessageSent = new 
AtomicBoolean(false);
     private final ExecutorService service;
     private final AtomicReference<State> state = new 
AtomicReference<State>(State.LATENT);
 
+    // guarded by sync
+    private ConnectionState currentConnectionState;
+
     private enum State
     {
         LATENT,
@@ -133,23 +135,44 @@ public class ConnectionStateManager implements Closeable
     }
 
     /**
+     * Change to {@link ConnectionState#SUSPENDED} only if not already 
suspended and not lost
+     */
+    public synchronized void setToSuspended()
+    {
+        if ( state.get() != State.STARTED )
+        {
+            return;
+        }
+
+        if ( (currentConnectionState == ConnectionState.LOST) || 
(currentConnectionState == ConnectionState.SUSPENDED) )
+        {
+            return;
+        }
+
+        currentConnectionState = ConnectionState.SUSPENDED;
+        postState(ConnectionState.SUSPENDED);
+    }
+
+    /**
      * Post a state change. If the manager is already in that state the change
      * is ignored. Otherwise the change is queued for listeners.
      *
      * @param newConnectionState new state
+     * @return true if the state actually changed, false if it was already at 
that state
      */
-    public void addStateChange(ConnectionState newConnectionState)
+    public synchronized boolean addStateChange(ConnectionState 
newConnectionState)
     {
         if ( state.get() != State.STARTED )
         {
-            return;
+            return false;
         }
 
-        ConnectionState previousState = 
currentState.getAndSet(newConnectionState);
+        ConnectionState previousState = currentConnectionState;
         if ( previousState == newConnectionState )
         {
-            return;
+            return false;
         }
+        currentConnectionState = newConnectionState;
 
         ConnectionState localState = newConnectionState;
         boolean isNegativeMessage = ((newConnectionState == 
ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED));
@@ -158,8 +181,15 @@ public class ConnectionStateManager implements Closeable
             localState = ConnectionState.CONNECTED;
         }
 
-        log.info("State change: " + localState);
-        while ( !eventQueue.offer(localState) )
+        postState(localState);
+
+        return true;
+    }
+
+    private void postState(ConnectionState state)
+    {
+        log.info("State change: " + state);
+        while ( !eventQueue.offer(state) )
         {
             eventQueue.poll();
             log.warn("ConnectionStateManager queue full - dropping events to 
make room");

http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
new file mode 100644
index 0000000..b1c382f
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.client;
+
+import com.google.common.collect.Queues;
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.BaseClassForTests;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.DebugUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+// NOTE: these tests are in Framework as they use the PersistentEphemeralNode 
recipe
+
+public class TestBackgroundStates extends BaseClassForTests
+{
+    @Test
+    public void testListenersReconnectedIsOK() throws Exception
+    {
+        server.close();
+
+        Timing timing = new Timing();
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            PersistentEphemeralNode node = new PersistentEphemeralNode(client, 
PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
+            node.start();
+
+            final CountDownLatch connectedLatch = new CountDownLatch(1);
+            final CountDownLatch reconnectedLatch = new CountDownLatch(1);
+            final AtomicReference<ConnectionState> lastState = new 
AtomicReference<ConnectionState>();
+            ConnectionStateListener listener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                {
+                    lastState.set(newState);
+                    if ( newState == ConnectionState.CONNECTED )
+                    {
+                        connectedLatch.countDown();
+                    }
+                    if ( newState == ConnectionState.RECONNECTED )
+                    {
+                        reconnectedLatch.countDown();
+                    }
+                }
+            };
+            client.getConnectionStateListenable().addListener(listener);
+            timing.sleepABit();
+            server = new TestingServer(server.getPort());
+            Assert.assertTrue(timing.awaitLatch(connectedLatch));
+            timing.sleepABit();
+            
Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS));
+            server.close();
+            timing.sleepABit();
+            server = new TestingServer(server.getPort());
+            timing.sleepABit();
+            Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
+            timing.sleepABit();
+            Assert.assertEquals(lastState.get(), ConnectionState.RECONNECTED);
+        }
+        finally
+        {
+            Closeables.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testConnectionStateListener() throws Exception
+    {
+        System.setProperty(DebugUtils.PROPERTY_LOG_EVENTS, "true");
+
+        server.close();
+
+        Timing timing = new Timing();
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(timing.milliseconds()));
+        try
+        {
+            client.start();
+            PersistentEphemeralNode node = new PersistentEphemeralNode(client, 
PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
+            node.start();
+
+            final BlockingQueue<ConnectionState> stateVector = 
Queues.newLinkedBlockingQueue(1);
+            ConnectionStateListener listener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                {
+                    stateVector.offer(newState);
+                }
+            };
+
+            Timing waitingTiming = timing.forWaiting();
+
+            client.getConnectionStateListenable().addListener(listener);
+            server = new TestingServer(server.getPort());
+            Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
+            server.close();
+            Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+            Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST);
+            server = new TestingServer(server.getPort());
+            Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);
+            server.close();
+            Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
+            Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST);
+        }
+        finally
+        {
+            Closeables.closeQuietly(client);
+        }
+    }
+
+}
\ No newline at end of file

Reply via email to