major refactoring. Abstracting old/new behavior into a pluggable ConnectionHandlingPolicy. Also, IMPORTANT, made the new behavior the default. This needs to be discussed but it's a major improvement and we should default to it.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e2391370 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e2391370 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e2391370 Branch: refs/heads/CURATOR-248 Commit: e239137019608f02cabb23c27ab13adcef88c027 Parents: 6381ccb Author: randgalt <randg...@apache.org> Authored: Sat Aug 22 19:06:55 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Sat Aug 22 19:06:55 2015 -0500 ---------------------------------------------------------------------- .../org/apache/curator/ConnectionState.java | 85 ++++++++++++-------- .../apache/curator/CuratorZookeeperClient.java | 32 ++++---- .../main/java/org/apache/curator/RetryLoop.java | 28 +++++-- .../ClassicConnectionHandlingPolicy.java | 48 +++++++++++ .../connection/ConnectionHandlingPolicy.java | 84 +++++++++++++++++++ .../StandardConnectionHandlingPolicy.java | 35 ++++++++ .../java/org/apache/curator/TestEnsurePath.java | 5 +- .../framework/CuratorFrameworkFactory.java | 54 +++++++++++-- .../framework/imps/CuratorFrameworkImpl.java | 43 +++++----- .../framework/state/ConnectionState.java | 20 +---- .../framework/state/ConnectionStateManager.java | 9 +-- .../imps/TestEnabledSessionExpiredState.java | 5 +- .../apache/curator/test/BaseClassForTests.java | 6 +- 13 files changed, 336 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/ConnectionState.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/ConnectionState.java b/curator-client/src/main/java/org/apache/curator/ConnectionState.java index c3d6921..d6ddd33 100644 --- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java +++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java @@ -18,9 +18,10 @@ */ package org.apache.curator; -import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.connection.ConnectionHandlingPolicy; import org.apache.curator.drivers.TracerDriver; import org.apache.curator.ensemble.EnsembleProvider; +import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.DebugUtils; import org.apache.curator.utils.ZookeeperFactory; import org.apache.zookeeper.KeeperException; @@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -48,18 +50,19 @@ class ConnectionState implements Watcher, Closeable private final int sessionTimeoutMs; private final int connectionTimeoutMs; private final AtomicReference<TracerDriver> tracer; + private final ConnectionHandlingPolicy connectionHandlingPolicy; private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>(); private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>(); private final AtomicLong instanceIndex = new AtomicLong(); private volatile long connectionStartMs = 0; - private final AtomicBoolean enableTimeoutChecks = new AtomicBoolean(true); - ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly) + ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) { this.ensembleProvider = ensembleProvider; this.sessionTimeoutMs = sessionTimeoutMs; this.connectionTimeoutMs = connectionTimeoutMs; this.tracer = tracer; + this.connectionHandlingPolicy = connectionHandlingPolicy; if ( parentWatcher != null ) { parentWatchers.offer(parentWatcher); @@ -68,11 +71,6 @@ class ConnectionState implements Watcher, Closeable zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly); } - void disableTimeoutChecks() - { - enableTimeoutChecks.set(false); - } - ZooKeeper getZooKeeper() throws Exception { if ( SessionFailRetryLoop.sessionForThreadHasFailed() ) @@ -87,13 +85,10 @@ class ConnectionState implements Watcher, Closeable throw exception; } - if ( enableTimeoutChecks.get() ) + boolean localIsConnected = isConnected.get(); + if ( !localIsConnected ) { - boolean localIsConnected = isConnected.get(); - if ( !localIsConnected ) - { - checkTimeouts(); - } + checkTimeouts(); } return zooKeeper.getZooKeeper(); @@ -194,35 +189,57 @@ class ConnectionState implements Watcher, Closeable private synchronized void checkTimeouts() throws Exception { - int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs); - long elapsed = System.currentTimeMillis() - connectionStartMs; - if ( elapsed >= minTimeout ) + Callable<Boolean> hasNewConnectionString = new Callable<Boolean>() + { + @Override + public Boolean call() + { + return zooKeeper.hasNewConnectionString(); + } + }; + ConnectionHandlingPolicy.CheckTimeoutsResult result = connectionHandlingPolicy.checkTimeouts(hasNewConnectionString, connectionStartMs, sessionTimeoutMs, connectionTimeoutMs); + switch ( result ) { - if ( zooKeeper.hasNewConnectionString() ) + default: + case NOP: + { + break; + } + + case NEW_CONNECTION_STRING: { handleNewConnectionString(); + break; } - else + + case RESET_CONNECTION: { - int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs); - if ( elapsed > maxTimeout ) + if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) { - if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) - { - log.warn(String.format("Connection attempt unsuccessful after %d (greater than max timeout of %d). Resetting connection and trying again with a new connection.", elapsed, maxTimeout)); - } - reset(); + long elapsed = System.currentTimeMillis() - connectionStartMs; + int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs); + log.warn(String.format("Connection attempt unsuccessful after %d (greater than max timeout of %d). Resetting connection and trying again with a new connection.", elapsed, maxTimeout)); } - else + reset(); + break; + } + + case CONNECTION_TIMEOUT: + { + KeeperException.ConnectionLossException connectionLossException = new CuratorConnectionLossException(); + if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) { - KeeperException.ConnectionLossException connectionLossException = new CuratorConnectionLossException(); - if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) - { - log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException); - } - tracer.get().addCount("connections-timed-out", 1); - throw connectionLossException; + long elapsed = System.currentTimeMillis() - connectionStartMs; + log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException); } + tracer.get().addCount("connections-timed-out", 1); + throw connectionLossException; + } + + case SESSION_TIMEOUT: + { + handleExpiredSession(); + break; } } } http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java index a065d78..9342acf 100644 --- a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java +++ b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java @@ -21,6 +21,8 @@ package org.apache.curator; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.curator.connection.ClassicConnectionHandlingPolicy; +import org.apache.curator.connection.ConnectionHandlingPolicy; import org.apache.curator.drivers.TracerDriver; import org.apache.curator.ensemble.EnsembleProvider; import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; @@ -51,7 +53,7 @@ public class CuratorZookeeperClient implements Closeable private final int connectionTimeoutMs; private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicReference<TracerDriver> tracer = new AtomicReference<TracerDriver>(new DefaultTracerDriver()); - private final boolean manageTimeouts; + private final ConnectionHandlingPolicy connectionHandlingPolicy; private final AtomicReference<Exception> debugException = new AtomicReference<>(); /** @@ -64,7 +66,7 @@ public class CuratorZookeeperClient implements Closeable */ public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy) { - this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, true); + this(new DefaultZookeeperFactory(), new FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new ClassicConnectionHandlingPolicy()); } /** @@ -76,7 +78,7 @@ public class CuratorZookeeperClient implements Closeable */ public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy) { - this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, true); + this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new ClassicConnectionHandlingPolicy()); } /** @@ -93,7 +95,7 @@ public class CuratorZookeeperClient implements Closeable */ public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly) { - this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, canBeReadOnly, true); + this(new DefaultZookeeperFactory(), ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, canBeReadOnly, new ClassicConnectionHandlingPolicy()); } /** @@ -107,11 +109,12 @@ public class CuratorZookeeperClient implements Closeable * read only mode in case of a network partition. See * {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)} * for details - * @param manageTimeouts in general, Curator clients try to manage session/connection timeouts. If this is false, that management is turned off + * @param connectionHandlingPolicy connection handling policy - use one of the pre-defined policies or write your own + * @since 3.0.0 */ - public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, boolean manageTimeouts) + public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) { - this.manageTimeouts = manageTimeouts; + this.connectionHandlingPolicy = connectionHandlingPolicy; if ( sessionTimeoutMs < connectionTimeoutMs ) { log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs)); @@ -121,11 +124,7 @@ public class CuratorZookeeperClient implements Closeable ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null"); this.connectionTimeoutMs = connectionTimeoutMs; - state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly); - if ( !manageTimeouts ) - { - state.disableTimeoutChecks(); - } + state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy); setRetryPolicy(retryPolicy); } @@ -328,14 +327,13 @@ public class CuratorZookeeperClient implements Closeable } /** - * Returns true if connection timeouts should cause the retry policy to be checked. If false - * is returned, throw a connection exception without retrying + * Return the configured connection handling policy * - * @return true/false + * @return ConnectionHandlingPolicy */ - public boolean retryConnectionTimeouts() + public ConnectionHandlingPolicy getConnectionHandlingPolicy() { - return manageTimeouts; + return connectionHandlingPolicy; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/RetryLoop.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java b/curator-client/src/main/java/org/apache/curator/RetryLoop.java index 92291c1..35d55a1 100644 --- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java +++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java @@ -110,14 +110,30 @@ public class RetryLoop } client.internalBlockUntilConnectedOrTimedOut(); - if ( !client.isConnected() && !client.retryConnectionTimeouts() ) + + switch ( client.getConnectionHandlingPolicy().preRetry(client) ) { - connectionFailed = true; - break; + default: + case CALL_PROC: + { + result = proc.call(); + retryLoop.markComplete(); + break; + } + + case EXIT_RETRIES: + { + retryLoop.markComplete(); + break; + } + + case CONNECTION_TIMEOUT: + { + connectionFailed = true; + retryLoop.markComplete(); + break; + } } - - result = proc.call(); - retryLoop.markComplete(); } catch ( Exception e ) { http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java new file mode 100644 index 0000000..71dc065 --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java @@ -0,0 +1,48 @@ +package org.apache.curator.connection; + +import org.apache.curator.CuratorZookeeperClient; +import java.util.concurrent.Callable; + +public class ClassicConnectionHandlingPolicy implements ConnectionHandlingPolicy +{ + @Override + public boolean isEmulatingClassicHandling() + { + return true; + } + + @Override + public CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception + { + CheckTimeoutsResult result = CheckTimeoutsResult.NOP; + int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs); + long elapsed = System.currentTimeMillis() - connectionStartMs; + if ( elapsed >= minTimeout ) + { + if ( hasNewConnectionString.call() ) + { + result = CheckTimeoutsResult.NEW_CONNECTION_STRING; + } + else + { + int maxTimeout = Math.max(sessionTimeoutMs, connectionTimeoutMs); + if ( elapsed > maxTimeout ) + { + result = CheckTimeoutsResult.RESET_CONNECTION; + } + else + { + result = CheckTimeoutsResult.CONNECTION_TIMEOUT; + } + } + } + + return result; + } + + @Override + public PreRetryResult preRetry(CuratorZookeeperClient client) throws Exception + { + return PreRetryResult.CALL_PROC; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java new file mode 100644 index 0000000..f3ecce6 --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java @@ -0,0 +1,84 @@ +package org.apache.curator.connection; + +import org.apache.curator.CuratorZookeeperClient; +import org.apache.zookeeper.KeeperException; +import java.util.concurrent.Callable; + +public interface ConnectionHandlingPolicy +{ + /** + * Return true if this policy should behave like the pre-3.0.0 version of Curator + * + * @return true/false + */ + boolean isEmulatingClassicHandling(); + + enum CheckTimeoutsResult + { + /** + * Do nothing + */ + NOP, + + /** + * handle a new connection string + */ + NEW_CONNECTION_STRING, + + /** + * reset/recreate the internal ZooKeeper connection + */ + RESET_CONNECTION, + + /** + * handle a connection timeout + */ + CONNECTION_TIMEOUT, + + /** + * handle a session timeout + */ + SESSION_TIMEOUT + } + + /** + * Check timeouts. NOTE: this method is only called when an attempt to access to the ZooKeeper instances + * is made and the connection has not completed. + * + * @param hasNewConnectionString proc to call to check if there is a new connection string. Important: the internal state is cleared after + * this is called so you MUST handle the new connection string if <tt>true</tt> is returned + * @param connectionStartMs the epoch/ms time that the connection was first initiated + * @param sessionTimeoutMs the configured session timeout in milliseconds + * @param connectionTimeoutMs the configured connection timeout in milliseconds + * @return result + * @throws Exception errors + */ + CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception; + + enum PreRetryResult + { + /** + * The retry loop should call the procedure + */ + CALL_PROC, + + /** + * Do not call the procedure and exit the retry loop + */ + EXIT_RETRIES, + + /** + * Do not call the procedure and throw {@link KeeperException.ConnectionLossException} + */ + CONNECTION_TIMEOUT + } + + /** + * Called prior to each iteration of a procedure in a retry loop + * + * @param client the client + * @return result + * @throws Exception errors + */ + PreRetryResult preRetry(CuratorZookeeperClient client) throws Exception; +} http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java ---------------------------------------------------------------------- diff --git a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java new file mode 100644 index 0000000..06285ca --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java @@ -0,0 +1,35 @@ +package org.apache.curator.connection; + +import org.apache.curator.CuratorZookeeperClient; +import java.util.concurrent.Callable; + +public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolicy +{ + @Override + public boolean isEmulatingClassicHandling() + { + return false; + } + + @Override + public CheckTimeoutsResult checkTimeouts(Callable<Boolean> hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int connectionTimeoutMs) throws Exception + { + if ( hasNewConnectionString.call() ) + { + return CheckTimeoutsResult.NEW_CONNECTION_STRING; + } + return CheckTimeoutsResult.NOP; + } + + @Override + public PreRetryResult preRetry(CuratorZookeeperClient client) throws Exception + { + // TODO - see if there are other servers to connect to + if ( !client.isConnected() ) + { + return PreRetryResult.CONNECTION_TIMEOUT; + } + + return PreRetryResult.CALL_PROC; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java ---------------------------------------------------------------------- diff --git a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java index 871e4af..59c30ac 100644 --- a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java +++ b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import org.apache.curator.connection.ClassicConnectionHandlingPolicy; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.utils.EnsurePath; import org.apache.zookeeper.ZooKeeper; @@ -51,7 +52,7 @@ public class TestEnsurePath CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class); RetryPolicy retryPolicy = new RetryOneTime(1); RetryLoop retryLoop = new RetryLoop(retryPolicy, null); - when(curator.retryConnectionTimeouts()).thenReturn(true); + when(curator.getConnectionHandlingPolicy()).thenReturn(new ClassicConnectionHandlingPolicy()); when(curator.getZooKeeper()).thenReturn(client); when(curator.getRetryPolicy()).thenReturn(retryPolicy); when(curator.newRetryLoop()).thenReturn(retryLoop); @@ -77,7 +78,7 @@ public class TestEnsurePath RetryPolicy retryPolicy = new RetryOneTime(1); RetryLoop retryLoop = new RetryLoop(retryPolicy, null); final CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class); - when(curator.retryConnectionTimeouts()).thenReturn(true); + when(curator.getConnectionHandlingPolicy()).thenReturn(new ClassicConnectionHandlingPolicy()); when(curator.getZooKeeper()).thenReturn(client); when(curator.getRetryPolicy()).thenReturn(retryPolicy); when(curator.newRetryLoop()).thenReturn(retryLoop); http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index fad4fc2..01a8666 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -21,6 +21,9 @@ package org.apache.curator.framework; import com.google.common.collect.ImmutableList; import org.apache.curator.RetryPolicy; +import org.apache.curator.connection.ClassicConnectionHandlingPolicy; +import org.apache.curator.connection.ConnectionHandlingPolicy; +import org.apache.curator.connection.StandardConnectionHandlingPolicy; import org.apache.curator.ensemble.EnsembleProvider; import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; import org.apache.curator.framework.api.ACLProvider; @@ -35,6 +38,7 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.utils.DefaultZookeeperFactory; import org.apache.curator.utils.ZookeeperFactory; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.net.InetAddress; @@ -117,7 +121,7 @@ public class CuratorFrameworkFactory private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER; private boolean canBeReadOnly = false; private boolean useContainerParentsIfAvailable = true; - private boolean enableSessionExpiredState = Boolean.getBoolean("curator-enable-session-expired-state"); + private ConnectionHandlingPolicy connectionHandlingPolicy = Boolean.getBoolean("curator-use-classic-connection-handling") ? new ClassicConnectionHandlingPolicy() : new StandardConnectionHandlingPolicy(); /** * Apply the current values and build a new CuratorFramework @@ -346,14 +350,50 @@ public class CuratorFrameworkFactory } /** - * Changes the meaning of {@link ConnectionState#LOST} from it's pre Curator 3.0.0 meaning - * to a true lost session state. See the {@link ConnectionState#LOST} doc for details. + * <p> + * Change the connection handling policy. The default policy is {@link StandardConnectionHandlingPolicy}. + * </p> + * <p> + * <strong>IMPORTANT: </strong> StandardConnectionHandlingPolicy has different behavior than the connection + * policy handling prior to version 3.0.0. You can specify that the connection handling be the method + * prior to 3.0.0 by passing in an instance of {@link ClassicConnectionHandlingPolicy} here or by + * setting the command line value "curator-use-classic-connection-handling" to true (e.g. <tt>-Dcurator-use-classic-connection-handling=true</tt>). + * </p> + * <p> + * Major differences from the older behavior are: + * </p> + * <ul> + * <li> + * Session/connection timeouts are no longer managed by the low-level client. They are managed + * by the CuratorFramework instance. There should be no noticeable differences. + * </li> + * <li> + * Prior to 3.0.0, an elapsed connection timeout would be presented to the retry policy, possibly + * causing retries. Now, elapsed connection timeouts are only retried if there is an another server + * in the connection string. i.e. a new instance will be retried should the retry policy allow a retry. + * If no other servers remain, a {@link KeeperException.ConnectionLossException} is thrown immediately + * without notifying the retry policy. + * </li> + * <li> + * <strong>MOST IMPORTANTLY!</strong> Prior to 3.0.0, {@link ConnectionState#LOST} did not imply + * a lost session (much to the confusion of users). Now, + * Curator will set the LOST state only when it believes that the ZooKeeper session + * has expired. ZooKeeper connections have a session. When the session expires, clients must take appropriate + * action. In Curator, this is complicated by the fact that Curator internally manages the ZooKeeper + * connection. Now, Curator will set the LOST state when any of the following occurs: + * a) ZooKeeper returns a {@link Watcher.Event.KeeperState#Expired} or {@link KeeperException.Code#SESSIONEXPIRED}; + * b) Curator closes the internally managed ZooKeeper instance; c) The configured session timeout + * elapses during a network partition. + * </li> + * </ul> * + * @param connectionHandlingPolicy the policy * @return this + * @since 3.0.0 */ - public Builder enableSessionExpiredState() + public Builder connectionHandlingPolicy(ConnectionHandlingPolicy connectionHandlingPolicy) { - this.enableSessionExpiredState = true; + this.connectionHandlingPolicy = connectionHandlingPolicy; return this; } @@ -412,9 +452,9 @@ public class CuratorFrameworkFactory return useContainerParentsIfAvailable; } - public boolean getEnableSessionExpiredState() + public ConnectionHandlingPolicy getConnectionHandlingPolicy() { - return enableSessionExpiredState; + return connectionHandlingPolicy; } @Deprecated http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index bcbeecd..44a8ec6 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -84,7 +84,6 @@ public class CuratorFrameworkImpl implements CuratorFramework private final NamespaceFacadeCache namespaceFacadeCache; private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this); private final boolean useContainerParentsIfAvailable; - private final boolean enableSessionExpiredState; private final AtomicLong currentInstanceIndex = new AtomicLong(-1); private volatile ExecutorService executorService; @@ -107,24 +106,24 @@ public class CuratorFrameworkImpl implements CuratorFramework { ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory()); this.client = new CuratorZookeeperClient - ( - localZookeeperFactory, - builder.getEnsembleProvider(), - builder.getSessionTimeoutMs(), - builder.getConnectionTimeoutMs(), - new Watcher() - { - @Override - public void process(WatchedEvent watchedEvent) + ( + localZookeeperFactory, + builder.getEnsembleProvider(), + builder.getSessionTimeoutMs(), + builder.getConnectionTimeoutMs(), + new Watcher() { - CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null); - processEvent(event); - } - }, - builder.getRetryPolicy(), - builder.canBeReadOnly(), - !builder.getEnableSessionExpiredState() // inverse is correct here. By default, CuratorZookeeperClient manages timeouts. The new SessionExpiredState needs this disabled. - ); + @Override + public void process(WatchedEvent watchedEvent) + { + CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null); + processEvent(event); + } + }, + builder.getRetryPolicy(), + builder.canBeReadOnly(), + builder.getConnectionHandlingPolicy() + ); listeners = new ListenerContainer<CuratorListener>(); unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>(); @@ -132,12 +131,11 @@ public class CuratorFrameworkImpl implements CuratorFramework namespace = new NamespaceImpl(this, builder.getNamespace()); threadFactory = getThreadFactory(builder); maxCloseWaitMs = builder.getMaxCloseWaitMs(); - connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getEnableSessionExpiredState(), builder.getSessionTimeoutMs()); + connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs()); compressionProvider = builder.getCompressionProvider(); aclProvider = builder.getAclProvider(); state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT); useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable(); - enableSessionExpiredState = builder.getEnableSessionExpiredState(); byte[] builderDefaultData = builder.getDefaultData(); defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0]; @@ -211,7 +209,6 @@ public class CuratorFrameworkImpl implements CuratorFramework state = parent.state; authInfos = parent.authInfos; useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable; - enableSessionExpiredState = parent.enableSessionExpiredState; } @Override @@ -699,7 +696,7 @@ public class CuratorFrameworkImpl implements CuratorFramework private void checkNewConnection() { - if ( enableSessionExpiredState ) + if ( !client.getConnectionHandlingPolicy().isEmulatingClassicHandling() ) { long instanceIndex = client.getInstanceIndex(); long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex); @@ -752,7 +749,7 @@ public class CuratorFrameworkImpl implements CuratorFramework return; } - if ( !enableSessionExpiredState ) + if ( client.getConnectionHandlingPolicy().isEmulatingClassicHandling() ) { doSyncForSuspendedConnection(client.getInstanceIndex()); } http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java index 79f3b62..fe40abf 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java @@ -64,32 +64,18 @@ public enum ConnectionState /** * <p> - * NOTE: the meaning of this state depends on how your CuratorFramework instance - * is created. - * </p> - * - * <p> - * The default meaning of LOST (and the only meaning prior to Curator 3.0.0) is: - * The connection is confirmed to be lost (i.e. the retry policy has given up). Close any locks, leaders, etc. and - * attempt to re-create them. NOTE: it is possible to get a {@link #RECONNECTED} - * state after this but you should still consider any locks, etc. as dirty/unstable - * </p> - * - * <p> - * <strong>Since 3.0.0</strong>, you can alter the meaning of LOST by calling - * {@link CuratorFrameworkFactory.Builder#enableSessionExpiredState()}. In this mode, * Curator will set the LOST state only when it believes that the ZooKeeper session * has expired. ZooKeeper connections have a session. When the session expires, clients must take appropriate * action. In Curator, this is complicated by the fact that Curator internally manages the ZooKeeper - * connection. In this mode, Curator will set the LOST state when any of the following occurs: + * connection. Curator will set the LOST state when any of the following occurs: * a) ZooKeeper returns a {@link Watcher.Event.KeeperState#Expired} or {@link KeeperException.Code#SESSIONEXPIRED}; * b) Curator closes the internally managed ZooKeeper instance; c) The configured session timeout * elapses during a network partition. * </p> * * <p> - * NOTE: the new behavior for the LOST state can also be enabled via the command line - * property "curator-enable-session-expired-state" (e.g. -Dcurator-enable-session-expired-state=true) + * NOTE: see {@link CuratorFrameworkFactory.Builder#connectionHandlingPolicy} for an important note about a + * change in meaning to LOST since 3.0.0 * </p> */ LOST http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 52e0d07..2e7492f 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -21,6 +21,7 @@ package org.apache.curator.framework.state; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import org.apache.curator.connection.ConnectionHandlingPolicyStyle; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.utils.ThreadUtils; @@ -35,7 +36,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; /** @@ -66,7 +66,6 @@ public class ConnectionStateManager implements Closeable private final Logger log = LoggerFactory.getLogger(getClass()); private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE); private final CuratorFramework client; - private final boolean enableSessionExpiredState; private final int sessionTimeoutMs; private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>(); private final AtomicBoolean initialConnectMessageSent = new AtomicBoolean(false); @@ -88,13 +87,11 @@ public class ConnectionStateManager implements Closeable /** * @param client the client * @param threadFactory thread factory to use or null for a default - * @param enableSessionExpiredState if true, applies new meaning for LOST as described here: {@link ConnectionState#LOST} * @param sessionTimeoutMs the ZK session timeout in milliseconds */ - public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, boolean enableSessionExpiredState, int sessionTimeoutMs) + public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs) { this.client = client; - this.enableSessionExpiredState = enableSessionExpiredState; this.sessionTimeoutMs = sessionTimeoutMs; if ( threadFactory == null ) { @@ -273,7 +270,7 @@ public class ConnectionStateManager implements Closeable } ); } - else if ( enableSessionExpiredState ) + else if ( !client.getZookeeperClient().getConnectionHandlingPolicy().isEmulatingClassicHandling() ) { synchronized(this) { http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java index cd415b1..4d6f473 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java @@ -53,7 +53,6 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests .connectString(server.getConnectString()) .connectionTimeoutMs(timing.connection()) .sessionTimeoutMs(timing.session()) - .enableSessionExpiredState() .retryPolicy(new RetryOneTime(1)) .build(); client.start(); @@ -115,7 +114,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED); server.stop(); Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED); - Assert.assertEquals(states.poll(timing.multiple(2).session(), TimeUnit.MILLISECONDS), ConnectionState.LOST); + Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.LOST); } @Test @@ -125,7 +124,7 @@ public class TestEnabledSessionExpiredState extends BaseClassForTests server.stop(); timing.sleepForSession(); Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED); - Assert.assertEquals(states.poll(timing.multiple(2).session(), TimeUnit.MILLISECONDS), ConnectionState.LOST); + Assert.assertEquals(states.poll(timing.sessionSleep(), TimeUnit.MILLISECONDS), ConnectionState.LOST); server.restart(); client.checkExists().forPath("/"); Assert.assertEquals(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED); http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java index c9f3524..1f6503d 100644 --- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java +++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java @@ -86,14 +86,14 @@ public class BaseClassForTests public void beforeInvocation(IInvokedMethod method, ITestResult testResult) { int invocationCount = method.getTestMethod().getCurrentInvocationCount(); - System.setProperty("curator-enable-session-expired-state", Boolean.toString(invocationCount == 1)); - log.info("curator-enable-session-expired-state: " + Boolean.toString(invocationCount == 1)); + System.setProperty("curator-use-classic-connection-handling", Boolean.toString(invocationCount == 1)); + log.info("curator-use-classic-connection-handling: " + Boolean.toString(invocationCount == 1)); } @Override public void afterInvocation(IInvokedMethod method, ITestResult testResult) { - System.clearProperty("curator-enable-session-expired-state"); + System.clearProperty("curator-use-classic-connection-handling"); } }; context.getSuite().addListener(listener);