further refactoring. Abstracted old framework-level connection handling into ClassicInternalConnectionHandler. Probably more to do here
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/30bd7b65 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/30bd7b65 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/30bd7b65 Branch: refs/heads/CURATOR-3.0 Commit: 30bd7b655d201762d8ff74062964621879ac7134 Parents: e239137 Author: randgalt <randg...@apache.org> Authored: Sat Aug 22 19:29:36 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Sat Aug 22 19:29:36 2015 -0500 ---------------------------------------------------------------------- .../imps/ClassicInternalConnectionHandler.java | 58 ++++++++++++++++++ .../framework/imps/CuratorFrameworkImpl.java | 64 ++++++-------------- .../imps/InternalConnectionHandler.java | 10 +++ .../imps/StandardInternalConnectionHandler.java | 22 +++++++ .../framework/state/ConnectionStateManager.java | 8 ++- 5 files changed, 112 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java new file mode 100644 index 0000000..1de6e80 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java @@ -0,0 +1,58 @@ +package org.apache.curator.framework.imps; + +import org.apache.curator.framework.state.ConnectionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ClassicInternalConnectionHandler implements InternalConnectionHandler +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Override + public void checkNewConnection(CuratorFrameworkImpl client) + { + // NOP + } + + @Override + public boolean checkSessionExpirationEnabled() + { + return false; + } + + @Override + public void suspendConnection(CuratorFrameworkImpl client) + { + if ( client.setToSuspended() ) + { + doSyncForSuspendedConnection(client, client.getZookeeperClient().getInstanceIndex()); + } + } + + private void doSyncForSuspendedConnection(final CuratorFrameworkImpl client, final long instanceIndex) + { + // we appear to have disconnected, force a new ZK event and see if we can connect to another server + final BackgroundOperation<String> operation = new BackgroundSyncImpl(client, null); + OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>() + { + @Override + public void retriesExhausted(OperationAndData<String> operationAndData) + { + // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated + // so the pending background sync is no longer valid. + // if instanceIndex is -1, this is the second try to sync - punt and mark the connection lost + if ( (instanceIndex < 0) || (instanceIndex == client.getZookeeperClient().getInstanceIndex()) ) + { + client.addStateChange(ConnectionState.LOST); + } + else + { + log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying."); + // send -1 to signal that if it happens again, punt and mark the connection lost + doSyncForSuspendedConnection(client, -1); + } + } + }; + client.performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null)); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 44a8ec6..b04987d 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -85,6 +85,7 @@ public class CuratorFrameworkImpl implements CuratorFramework private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this); private final boolean useContainerParentsIfAvailable; private final AtomicLong currentInstanceIndex = new AtomicLong(-1); + private final InternalConnectionHandler internalConnectionHandler; private volatile ExecutorService executorService; private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false); @@ -125,13 +126,14 @@ public class CuratorFrameworkImpl implements CuratorFramework builder.getConnectionHandlingPolicy() ); + internalConnectionHandler = builder.getConnectionHandlingPolicy().isEmulatingClassicHandling() ? new ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler(); listeners = new ListenerContainer<CuratorListener>(); unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>(); backgroundOperations = new DelayQueue<OperationAndData<?>>(); namespace = new NamespaceImpl(this, builder.getNamespace()); threadFactory = getThreadFactory(builder); maxCloseWaitMs = builder.getMaxCloseWaitMs(); - connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs()); + connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), internalConnectionHandler.checkSessionExpirationEnabled()); compressionProvider = builder.getCompressionProvider(); aclProvider = builder.getAclProvider(); state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT); @@ -209,6 +211,7 @@ public class CuratorFrameworkImpl implements CuratorFramework state = parent.state; authInfos = parent.authInfos; useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable; + internalConnectionHandler = parent.internalConnectionHandler; } @Override @@ -676,7 +679,7 @@ public class CuratorFrameworkImpl implements CuratorFramework { if ( state == Watcher.Event.KeeperState.Disconnected ) { - suspendConnection(); + internalConnectionHandler.suspendConnection(this); } else if ( state == Watcher.Event.KeeperState.Expired ) { @@ -684,26 +687,23 @@ public class CuratorFrameworkImpl implements CuratorFramework } else if ( state == Watcher.Event.KeeperState.SyncConnected ) { - checkNewConnection(); + internalConnectionHandler.checkNewConnection(this); connectionStateManager.addStateChange(ConnectionState.RECONNECTED); } else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly ) { - checkNewConnection(); + internalConnectionHandler.checkNewConnection(this); connectionStateManager.addStateChange(ConnectionState.READ_ONLY); } } - private void checkNewConnection() + void checkInstanceIndex() { - if ( !client.getConnectionHandlingPolicy().isEmulatingClassicHandling() ) + long instanceIndex = client.getInstanceIndex(); + long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex); + if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) ) // currentInstanceIndex is initially -1 - ignore this { - long instanceIndex = client.getInstanceIndex(); - long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex); - if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) ) // currentInstanceIndex is initially -1 - ignore this - { - connectionStateManager.addStateChange(ConnectionState.LOST); - } + connectionStateManager.addStateChange(ConnectionState.LOST); } } @@ -742,44 +742,14 @@ public class CuratorFrameworkImpl implements CuratorFramework return null; } - private void suspendConnection() + boolean setToSuspended() { - if ( !connectionStateManager.setToSuspended() ) - { - return; - } - - if ( client.getConnectionHandlingPolicy().isEmulatingClassicHandling() ) - { - doSyncForSuspendedConnection(client.getInstanceIndex()); - } + return connectionStateManager.setToSuspended(); } - private void doSyncForSuspendedConnection(final long instanceIndex) + void addStateChange(ConnectionState newConnectionState) { - // we appear to have disconnected, force a new ZK event and see if we can connect to another server - final BackgroundOperation<String> operation = new BackgroundSyncImpl(this, null); - OperationAndData.ErrorCallback<String> errorCallback = new OperationAndData.ErrorCallback<String>() - { - @Override - public void retriesExhausted(OperationAndData<String> operationAndData) - { - // if instanceIndex != newInstanceIndex, the ZooKeeper instance was reset/reallocated - // so the pending background sync is no longer valid. - // if instanceIndex is -1, this is the second try to sync - punt and mark the connection lost - if ( (instanceIndex < 0) || (instanceIndex == client.getInstanceIndex()) ) - { - connectionStateManager.addStateChange(ConnectionState.LOST); - } - else - { - log.debug("suspendConnection() failure ignored as the ZooKeeper instance was reset. Retrying."); - // send -1 to signal that if it happens again, punt and mark the connection lost - doSyncForSuspendedConnection(-1); - } - } - }; - performBackgroundOperation(new OperationAndData<String>(operation, "/", null, errorCallback, null)); + connectionStateManager.addStateChange(newConnectionState); } @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"}) @@ -894,7 +864,7 @@ public class CuratorFrameworkImpl implements CuratorFramework } } - private void performBackgroundOperation(OperationAndData<?> operationAndData) + void performBackgroundOperation(OperationAndData<?> operationAndData) { try { http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java new file mode 100644 index 0000000..e9798d7 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java @@ -0,0 +1,10 @@ +package org.apache.curator.framework.imps; + +interface InternalConnectionHandler +{ + void checkNewConnection(CuratorFrameworkImpl client); + + void suspendConnection(CuratorFrameworkImpl client); + + boolean checkSessionExpirationEnabled(); +} http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java new file mode 100644 index 0000000..b0452c6 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java @@ -0,0 +1,22 @@ +package org.apache.curator.framework.imps; + +class StandardInternalConnectionHandler implements InternalConnectionHandler +{ + @Override + public void suspendConnection(CuratorFrameworkImpl client) + { + client.setToSuspended(); + } + + @Override + public boolean checkSessionExpirationEnabled() + { + return true; + } + + @Override + public void checkNewConnection(CuratorFrameworkImpl client) + { + client.checkInstanceIndex(); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 2e7492f..406099d 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -21,7 +21,6 @@ package org.apache.curator.framework.state; import com.google.common.base.Function; import com.google.common.base.Preconditions; -import org.apache.curator.connection.ConnectionHandlingPolicyStyle; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.utils.ThreadUtils; @@ -67,6 +66,7 @@ public class ConnectionStateManager implements Closeable private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE); private final CuratorFramework client; private final int sessionTimeoutMs; + private final boolean checkSessionExpiration; private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>(); private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false); private final ExecutorService service; @@ -88,11 +88,13 @@ public class ConnectionStateManager implements Closeable * @param client the client * @param threadFactory thread factory to use or null for a default * @param sessionTimeoutMs the ZK session timeout in milliseconds + * @param checkSessionExpiration if true, check for session timeouts, etc. ala new connection handling method */ - public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs) + public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, boolean checkSessionExpiration) { this.client = client; this.sessionTimeoutMs = sessionTimeoutMs; + this.checkSessionExpiration = checkSessionExpiration; if ( threadFactory == null ) { threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager"); @@ -270,7 +272,7 @@ public class ConnectionStateManager implements Closeable } ); } - else if ( !client.getZookeeperClient().getConnectionHandlingPolicy().isEmulatingClassicHandling() ) + else if ( checkSessionExpiration ) { synchronized(this) {