Continued re-work on the connection state transitions involving background/async APIs.
ConnectionStateManager now uses synchronization. This shouldn't hurt performance but rationalizes state changes from foreground/background ops. Background errors now go through same code as foreground errors. Transition to LOST is handled specifically instead of generally in logError(). Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/75acb0d9 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/75acb0d9 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/75acb0d9 Branch: refs/heads/master Commit: 75acb0d9222c3a54c9e15edff319acedcb26bbf5 Parents: a937dfa Author: randgalt <randg...@apache.org> Authored: Tue Dec 24 17:54:55 2013 -0500 Committer: randgalt <randg...@apache.org> Committed: Tue Dec 24 17:54:55 2013 -0500 ---------------------------------------------------------------------- .../java/org/apache/curator/HandleHolder.java | 2 +- .../curator/framework/imps/Backgrounding.java | 5 + .../framework/imps/CuratorFrameworkImpl.java | 128 +++++++++++------ .../framework/state/ConnectionStateManager.java | 44 +++++- .../framework/client/TestBackgroundStates.java | 142 +++++++++++++++++++ 5 files changed, 268 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-client/src/main/java/org/apache/curator/HandleHolder.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/HandleHolder.java b/curator-client/src/main/java/org/apache/curator/HandleHolder.java index 4922688..1f7cd91 100644 --- a/curator-client/src/main/java/org/apache/curator/HandleHolder.java +++ b/curator-client/src/main/java/org/apache/curator/HandleHolder.java @@ -52,7 +52,7 @@ class HandleHolder ZooKeeper getZooKeeper() throws Exception { - return helper.getZooKeeper(); + return (helper != null) ? helper.getZooKeeper() : null; } String getConnectionString() http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java index 6ae9151..262b2a8 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java @@ -21,6 +21,7 @@ package org.apache.curator.framework.imps; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.KeeperException; import java.util.concurrent.Executor; class Backgrounding @@ -109,6 +110,10 @@ class Backgrounding } catch ( Exception e ) { + if ( e instanceof KeeperException ) + { + client.validateConnection(client.codeToState(((KeeperException)e).code())); + } client.logError("Background operation result handling threw exception", e); } } http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 3aa1097..1b0ef3f 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -512,24 +512,19 @@ public class CuratorFrameworkImpl implements CuratorFramework log.error(reason, e); } - if ( e instanceof KeeperException.ConnectionLossException ) - { - handleKeeperStateDisconnected(); - } - final String localReason = reason; unhandledErrorListeners.forEach - ( - new Function<UnhandledErrorListener, Void>() - { - @Override - public Void apply(UnhandledErrorListener listener) + ( + new Function<UnhandledErrorListener, Void>() { - listener.unhandledError(localReason, e); - return null; + @Override + public Void apply(UnhandledErrorListener listener) + { + listener.unhandledError(localReason, e); + return null; + } } - } - ); + ); } String unfixForNamespace(String path) @@ -557,6 +552,73 @@ public class CuratorFrameworkImpl implements CuratorFramework return namespaceWatcherMap; } + void validateConnection(Watcher.Event.KeeperState state) + { + if ( state == Watcher.Event.KeeperState.Disconnected ) + { + suspendConnection(); + } + else if ( state == Watcher.Event.KeeperState.Expired ) + { + connectionStateManager.addStateChange(ConnectionState.LOST); + } + else if ( state == Watcher.Event.KeeperState.SyncConnected ) + { + connectionStateManager.addStateChange(ConnectionState.RECONNECTED); + } + else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly ) + { + connectionStateManager.addStateChange(ConnectionState.READ_ONLY); + } + } + + Watcher.Event.KeeperState codeToState(KeeperException.Code code) + { + switch ( code ) + { + case AUTHFAILED: + case NOAUTH: + { + return Watcher.Event.KeeperState.AuthFailed; + } + + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + { + return Watcher.Event.KeeperState.Disconnected; + } + + case SESSIONEXPIRED: + { + return Watcher.Event.KeeperState.Expired; + } + + case OK: + case SESSIONMOVED: + { + return Watcher.Event.KeeperState.SyncConnected; + } + } + return Watcher.Event.KeeperState.fromInt(-1); + } + + private void suspendConnection() + { + connectionStateManager.setToSuspended(); + + // we appear to have disconnected, force a new ZK event and see if we can connect to another server + BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null); + OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>() + { + @Override + public void retriesExhausted(OperationAndData<String> operationAndData) + { + connectionStateManager.addStateChange(ConnectionState.LOST); + } + }; + performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null)); + } + @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"}) private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event) { @@ -588,8 +650,10 @@ public class CuratorFrameworkImpl implements CuratorFramework } if ( e == null ) { - e = new Exception("Unknown result code: " + event.getResultCode()); + e = new Exception("Unknown result codegetResultCode()"); } + + validateConnection(codeToState(code)); logError("Background operation retry gave up", e); } return doRetry; @@ -714,7 +778,10 @@ public class CuratorFrameworkImpl implements CuratorFramework private void processEvent(final CuratorEvent curatorEvent) { - validateConnection(curatorEvent); + if ( curatorEvent.getType() == CuratorEventType.WATCHED ) + { + validateConnection(curatorEvent.getWatchedEvent().getState()); + } listeners.forEach ( @@ -738,33 +805,4 @@ public class CuratorFrameworkImpl implements CuratorFramework } ); } - - private void validateConnection(CuratorEvent curatorEvent) - { - if ( curatorEvent.getType() == CuratorEventType.WATCHED ) - { - if ( curatorEvent.getWatchedEvent().getState() == Watcher.Event.KeeperState.Disconnected ) - { - handleKeeperStateDisconnected(); - } - else if ( curatorEvent.getWatchedEvent().getState() == Watcher.Event.KeeperState.Expired ) - { - connectionStateManager.addStateChange(ConnectionState.LOST); - } - else if ( curatorEvent.getWatchedEvent().getState() == Watcher.Event.KeeperState.SyncConnected ) - { - connectionStateManager.addStateChange(ConnectionState.RECONNECTED); - } - else if ( curatorEvent.getWatchedEvent().getState() == Watcher.Event.KeeperState.ConnectedReadOnly ) - { - connectionStateManager.addStateChange(ConnectionState.READ_ONLY); - } - } - } - - private void handleKeeperStateDisconnected() - { - connectionStateManager.addStateChange(ConnectionState.SUSPENDED); - internalSync(this, "/", null); // we appear to have disconnected, force a new ZK event and see if we can connect to another server - } } http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index fe5f18a..a2cfa60 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -65,11 +65,13 @@ public class ConnectionStateManager implements Closeable private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE); private final CuratorFramework client; private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>(); - private final AtomicReference<ConnectionState> currentState = new AtomicReference<ConnectionState>(); private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false); private final ExecutorService service; private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); + // guarded by sync + private ConnectionState currentConnectionState; + private enum State { LATENT, @@ -133,23 +135,44 @@ public class ConnectionStateManager implements Closeable } /** + * Change to {@link ConnectionState#SUSPENDED} only if not already suspended and not lost + */ + public synchronized void setToSuspended() + { + if ( state.get() != State.STARTED ) + { + return; + } + + if ( (currentConnectionState == ConnectionState.LOST) || (currentConnectionState == ConnectionState.SUSPENDED) ) + { + return; + } + + currentConnectionState = ConnectionState.SUSPENDED; + postState(ConnectionState.SUSPENDED); + } + + /** * Post a state change. If the manager is already in that state the change * is ignored. Otherwise the change is queued for listeners. * * @param newConnectionState new state + * @return true if the state actually changed, false if it was already at that state */ - public void addStateChange(ConnectionState newConnectionState) + public synchronized boolean addStateChange(ConnectionState newConnectionState) { if ( state.get() != State.STARTED ) { - return; + return false; } - ConnectionState previousState = currentState.getAndSet(newConnectionState); + ConnectionState previousState = currentConnectionState; if ( previousState == newConnectionState ) { - return; + return false; } + currentConnectionState = newConnectionState; ConnectionState localState = newConnectionState; boolean isNegativeMessage = ((newConnectionState == ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED)); @@ -158,8 +181,15 @@ public class ConnectionStateManager implements Closeable localState = ConnectionState.CONNECTED; } - log.info("State change: " + localState); - while ( !eventQueue.offer(localState) ) + postState(localState); + + return true; + } + + private void postState(ConnectionState state) + { + log.info("State change: " + state); + while ( !eventQueue.offer(state) ) { eventQueue.poll(); log.warn("ConnectionStateManager queue full - dropping events to make room"); http://git-wip-us.apache.org/repos/asf/curator/blob/75acb0d9/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java new file mode 100644 index 0000000..b1c382f --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.client; + +import com.google.common.collect.Queues; +import com.google.common.io.Closeables; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.BaseClassForTests; +import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.TestingServer; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.DebugUtils; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +// NOTE: these tests are in Framework as they use the PersistentEphemeralNode recipe + +public class TestBackgroundStates extends BaseClassForTests +{ + @Test + public void testListenersReconnectedIsOK() throws Exception + { + server.close(); + + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + try + { + client.start(); + PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes()); + node.start(); + + final CountDownLatch connectedLatch = new CountDownLatch(1); + final CountDownLatch reconnectedLatch = new CountDownLatch(1); + final AtomicReference<ConnectionState> lastState = new AtomicReference<ConnectionState>(); + ConnectionStateListener listener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + lastState.set(newState); + if ( newState == ConnectionState.CONNECTED ) + { + connectedLatch.countDown(); + } + if ( newState == ConnectionState.RECONNECTED ) + { + reconnectedLatch.countDown(); + } + } + }; + client.getConnectionStateListenable().addListener(listener); + timing.sleepABit(); + server = new TestingServer(server.getPort()); + Assert.assertTrue(timing.awaitLatch(connectedLatch)); + timing.sleepABit(); + Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)); + server.close(); + timing.sleepABit(); + server = new TestingServer(server.getPort()); + timing.sleepABit(); + Assert.assertTrue(timing.awaitLatch(reconnectedLatch)); + timing.sleepABit(); + Assert.assertEquals(lastState.get(), ConnectionState.RECONNECTED); + } + finally + { + Closeables.closeQuietly(client); + } + } + + @Test + public void testConnectionStateListener() throws Exception + { + System.setProperty(DebugUtils.PROPERTY_LOG_EVENTS, "true"); + + server.close(); + + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(timing.milliseconds())); + try + { + client.start(); + PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes()); + node.start(); + + final BlockingQueue<ConnectionState> stateVector = Queues.newLinkedBlockingQueue(1); + ConnectionStateListener listener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + stateVector.offer(newState); + } + }; + + Timing waitingTiming = timing.forWaiting(); + + client.getConnectionStateListenable().addListener(listener); + server = new TestingServer(server.getPort()); + Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED); + server.close(); + Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED); + Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST); + server = new TestingServer(server.getPort()); + Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED); + server.close(); + Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED); + Assert.assertEquals(stateVector.poll(waitingTiming.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST); + } + finally + { + Closeables.closeQuietly(client); + } + } + +} \ No newline at end of file