First pass at new (optional) definition of state LOST
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/344634ac Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/344634ac Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/344634ac Branch: refs/heads/CURATOR-248 Commit: 344634ac6e34e61bc0cc7b41923a1df4089c7948 Parents: 7d97259 Author: randgalt <randg...@apache.org> Authored: Fri Aug 21 12:10:24 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Fri Aug 21 12:10:24 2015 -0500 ---------------------------------------------------------------------- .../framework/CuratorFrameworkFactory.java | 19 +++++ .../framework/api/UnhandledErrorListener.java | 4 +- .../framework/imps/CuratorFrameworkImpl.java | 10 ++- .../framework/state/ConnectionState.java | 35 +++++++-- .../framework/state/ConnectionStateManager.java | 75 +++++++++++++++----- 5 files changed, 113 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index dcb2ee6..6209b06 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -31,6 +31,7 @@ import org.apache.curator.framework.imps.CuratorFrameworkImpl; import org.apache.curator.framework.imps.CuratorTempFrameworkImpl; import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.framework.imps.GzipCompressionProvider; +import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.utils.DefaultZookeeperFactory; import org.apache.curator.utils.ZookeeperFactory; import org.apache.zookeeper.CreateMode; @@ -116,6 +117,7 @@ public class CuratorFrameworkFactory private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER; private boolean canBeReadOnly = false; private boolean useContainerParentsIfAvailable = true; + private boolean enableSessionExpiredState = false; /** * Apply the current values and build a new CuratorFramework @@ -343,6 +345,18 @@ public class CuratorFrameworkFactory return this; } + /** + * Changes the meaning of {@link ConnectionState#LOST} from it's pre Curator 3.0.0 meaning + * to a true lost session state. See the {@link ConnectionState#LOST} doc for details. + * + * @return this + */ + public Builder enableSessionExpiredState() + { + this.enableSessionExpiredState = true; + return this; + } + public ACLProvider getAclProvider() { return aclProvider; @@ -398,6 +412,11 @@ public class CuratorFrameworkFactory return useContainerParentsIfAvailable; } + public boolean getEnableSessionExpiredState() + { + return enableSessionExpiredState; + } + @Deprecated public String getAuthScheme() { http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java b/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java index b463af2..3721d4b 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java @@ -24,9 +24,7 @@ import org.apache.curator.framework.state.ConnectionStateListener; public interface UnhandledErrorListener { /** - * Called when an exception is caught in a background thread, handler, etc. Before this - * listener is called, the error will have been logged and a {@link ConnectionState#LOST} - * event will have been queued for any {@link ConnectionStateListener}s. + * Called when an exception is caught in a background thread, handler, etc. * * @param message Source message * @param e exception http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 41bb7cd..c64fb8f 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -83,6 +83,7 @@ public class CuratorFrameworkImpl implements CuratorFramework private final NamespaceFacadeCache namespaceFacadeCache; private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this); private final boolean useContainerParentsIfAvailable; + private final boolean enableSessionExpiredState; private volatile ExecutorService executorService; private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false); @@ -119,11 +120,12 @@ public class CuratorFrameworkImpl implements CuratorFramework namespace = new NamespaceImpl(this, builder.getNamespace()); threadFactory = getThreadFactory(builder); maxCloseWaitMs = builder.getMaxCloseWaitMs(); - connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory()); + connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getEnableSessionExpiredState(), builder.getSessionTimeoutMs()); compressionProvider = builder.getCompressionProvider(); aclProvider = builder.getAclProvider(); state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT); useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable(); + enableSessionExpiredState = builder.getEnableSessionExpiredState(); byte[] builderDefaultData = builder.getDefaultData(); defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0]; @@ -197,6 +199,7 @@ public class CuratorFrameworkImpl implements CuratorFramework state = parent.state; authInfos = parent.authInfos; useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable; + enableSessionExpiredState = parent.enableSessionExpiredState; } @Override @@ -722,7 +725,10 @@ public class CuratorFrameworkImpl implements CuratorFramework return; } - doSyncForSuspendedConnection(client.getInstanceIndex()); + if ( !enableSessionExpiredState ) + { + doSyncForSuspendedConnection(client.getInstanceIndex()); + } } private void doSyncForSuspendedConnection(final long instanceIndex) http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java index 3ca1d66..49d0044 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java @@ -19,6 +19,8 @@ package org.apache.curator.framework.state; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; /** * Represents state changes in the connection to ZK @@ -39,8 +41,7 @@ public enum ConnectionState /** * There has been a loss of connection. Leaders, locks, etc. should suspend - * until the connection is re-established. If the connection times-out you will - * receive a {@link #LOST} notice + * until the connection is re-established. */ SUSPENDED { @@ -62,9 +63,29 @@ public enum ConnectionState }, /** - * The connection is confirmed to be lost. Close any locks, leaders, etc. and - * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED} - * state after this but you should still consider any locks, etc. as dirty/unstable + * <p> + * NOTE: the meaning of this state depends on how your CuratorFramework instance + * is created. + * </p> + * + * <p> + * The default meaning of LOST (and the only meaning prior to Curator 3.0.0) is: + * The connection is confirmed to be lost (i.e. the retry policy has given up). Close any locks, leaders, etc. and + * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED} + * state after this but you should still consider any locks, etc. as dirty/unstable + * </p> + * + * <p> + * <strong>Since 3.0.0</strong>, you can alter the meaning of LOST by calling + * {@link CuratorFrameworkFactory.Builder#enableSessionExpiredState()}. In this mode, + * Curator will set the LOST state only when it believes that the ZooKeeper session + * has expired. ZooKeeper connections have a session. When the session expires, clients must take appropriate + * action. In Curator, this is complicated by the fact that Curator internally manages the ZooKeeper + * connection. In this mode, Curator will set the LOST state when any of the following occurs: + * a) ZooKeeper returns a {@link Watcher.Event.KeeperState#Expired} or {@link KeeperException.Code#SESSIONEXPIRED}; + * b) Curator closes the internally managed ZooKeeper instance; c) The configured session timeout + * elapses during a network partition. + * </p> */ LOST { @@ -87,7 +108,9 @@ public enum ConnectionState { return true; } - }; + } + + ; /** * Check if this state indicates that Curator has a connection to ZooKeeper http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 67ff13d..c0feb84 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -35,6 +35,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; /** @@ -65,6 +66,8 @@ public class ConnectionStateManager implements Closeable private final Logger log = LoggerFactory.getLogger(getClass()); private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE); private final CuratorFramework client; + private final boolean enableSessionExpiredState; + private final int sessionTimeoutMs; private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>(); private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false); private final ExecutorService service; @@ -72,6 +75,8 @@ public class ConnectionStateManager implements Closeable // guarded by sync private ConnectionState currentConnectionState; + // guarded by sync + private long startOfSuspendedEpoch = 0; private enum State { @@ -83,10 +88,14 @@ public class ConnectionStateManager implements Closeable /** * @param client the client * @param threadFactory thread factory to use or null for a default + * @param enableSessionExpiredState if true, applies new meaning for LOST as described here: {@link ConnectionState#LOST} + * @param sessionTimeoutMs the ZK session timeout in milliseconds */ - public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory) + public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, boolean enableSessionExpiredState, int sessionTimeoutMs) { this.client = client; + this.enableSessionExpiredState = enableSessionExpiredState; + this.sessionTimeoutMs = sessionTimeoutMs; if ( threadFactory == null ) { threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager"); @@ -137,7 +146,7 @@ public class ConnectionStateManager implements Closeable /** * Change to {@link ConnectionState#SUSPENDED} only if not already suspended and not lost - * + * * @return true if connection is set to SUSPENDED */ public synchronized boolean setToSuspended() @@ -152,7 +161,7 @@ public class ConnectionStateManager implements Closeable return false; } - currentConnectionState = ConnectionState.SUSPENDED; + setCurrentConnectionState(ConnectionState.SUSPENDED); postState(ConnectionState.SUSPENDED); return true; @@ -177,7 +186,7 @@ public class ConnectionStateManager implements Closeable { return false; } - currentConnectionState = newConnectionState; + setCurrentConnectionState(newConnectionState); ConnectionState localState = newConnectionState; boolean isNegativeMessage = ((newConnectionState == ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED) || (newConnectionState == ConnectionState.READ_ONLY)); @@ -242,25 +251,34 @@ public class ConnectionStateManager implements Closeable { while ( !Thread.currentThread().isInterrupted() ) { - final ConnectionState newState = eventQueue.take(); - - if ( listeners.size() == 0 ) + final ConnectionState newState = eventQueue.poll(sessionTimeoutMs, TimeUnit.MILLISECONDS); + if ( newState != null ) { - log.warn("There are no ConnectionStateListeners registered."); - } + if ( listeners.size() == 0 ) + { + log.warn("There are no ConnectionStateListeners registered."); + } - listeners.forEach - ( - new Function<ConnectionStateListener, Void>() - { - @Override - public Void apply(ConnectionStateListener listener) + listeners.forEach + ( + new Function<ConnectionStateListener, Void>() { - listener.stateChanged(client, newState); - return null; + @Override + public Void apply(ConnectionStateListener listener) + { + listener.stateChanged(client, newState); + return null; + } } - } - ); + ); + } + else if ( enableSessionExpiredState ) + { + synchronized(this) + { + checkSessionExpiration(); + } + } } } catch ( InterruptedException e ) @@ -268,4 +286,23 @@ public class ConnectionStateManager implements Closeable Thread.currentThread().interrupt(); } } + + private void checkSessionExpiration() + { + if ( (currentConnectionState == ConnectionState.SUSPENDED) && (startOfSuspendedEpoch != 0) ) + { + long elapsedMs = System.currentTimeMillis() - startOfSuspendedEpoch; + if ( elapsedMs >= sessionTimeoutMs ) + { + log.info(String.format("Session timeout has elapsed while SUSPENDED. Posting LOST event. Elapsed ms: %d", elapsedMs)); + addStateChange(ConnectionState.LOST); + } + } + } + + private void setCurrentConnectionState(ConnectionState newConnectionState) + { + currentConnectionState = newConnectionState; + startOfSuspendedEpoch = (currentConnectionState == ConnectionState.SUSPENDED) ? System.currentTimeMillis() : 0; + } }