major refactoring. Abstracting old/new behavior into a pluggable 
ConnectionHandlingPolicy. Also, IMPORTANT, made the new behavior the default. 
This needs to be discussed but it's a major improvement and we should default 
to it.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e2391370
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e2391370
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e2391370

Branch: refs/heads/CURATOR-248
Commit: e239137019608f02cabb23c27ab13adcef88c027
Parents: 6381ccb
Author: randgalt <randg...@apache.org>
Authored: Sat Aug 22 19:06:55 2015 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sat Aug 22 19:06:55 2015 -0500

----------------------------------------------------------------------
 .../org/apache/curator/ConnectionState.java     | 85 ++++++++++++--------
 .../apache/curator/CuratorZookeeperClient.java  | 32 ++++----
 .../main/java/org/apache/curator/RetryLoop.java | 28 +++++--
 .../ClassicConnectionHandlingPolicy.java        | 48 +++++++++++
 .../connection/ConnectionHandlingPolicy.java    | 84 +++++++++++++++++++
 .../StandardConnectionHandlingPolicy.java       | 35 ++++++++
 .../java/org/apache/curator/TestEnsurePath.java |  5 +-
 .../framework/CuratorFrameworkFactory.java      | 54 +++++++++++--
 .../framework/imps/CuratorFrameworkImpl.java    | 43 +++++-----
 .../framework/state/ConnectionState.java        | 20 +----
 .../framework/state/ConnectionStateManager.java |  9 +--
 .../imps/TestEnabledSessionExpiredState.java    |  5 +-
 .../apache/curator/test/BaseClassForTests.java  |  6 +-
 13 files changed, 336 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/ConnectionState.java 
b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index c3d6921..d6ddd33 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -18,9 +18,10 @@
  */
 package org.apache.curator;
 
-import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.connection.ConnectionHandlingPolicy;
 import org.apache.curator.drivers.TracerDriver;
 import org.apache.curator.ensemble.EnsembleProvider;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.DebugUtils;
 import org.apache.curator.utils.ZookeeperFactory;
 import org.apache.zookeeper.KeeperException;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -48,18 +50,19 @@ class ConnectionState implements Watcher, Closeable
     private final int sessionTimeoutMs;
     private final int connectionTimeoutMs;
     private final AtomicReference<TracerDriver> tracer;
+    private final ConnectionHandlingPolicy connectionHandlingPolicy;
     private final Queue<Exception> backgroundExceptions = new 
ConcurrentLinkedQueue<Exception>();
     private final Queue<Watcher> parentWatchers = new 
ConcurrentLinkedQueue<Watcher>();
     private final AtomicLong instanceIndex = new AtomicLong();
     private volatile long connectionStartMs = 0;
-    private final AtomicBoolean enableTimeoutChecks = new AtomicBoolean(true);
 
-    ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider 
ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher 
parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
+    ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider 
ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher 
parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, 
ConnectionHandlingPolicy connectionHandlingPolicy)
     {
         this.ensembleProvider = ensembleProvider;
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.connectionTimeoutMs = connectionTimeoutMs;
         this.tracer = tracer;
+        this.connectionHandlingPolicy = connectionHandlingPolicy;
         if ( parentWatcher != null )
         {
             parentWatchers.offer(parentWatcher);
@@ -68,11 +71,6 @@ class ConnectionState implements Watcher, Closeable
         zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, 
sessionTimeoutMs, canBeReadOnly);
     }
 
-    void disableTimeoutChecks()
-    {
-        enableTimeoutChecks.set(false);
-    }
-
     ZooKeeper getZooKeeper() throws Exception
     {
         if ( SessionFailRetryLoop.sessionForThreadHasFailed() )
@@ -87,13 +85,10 @@ class ConnectionState implements Watcher, Closeable
             throw exception;
         }
 
-        if ( enableTimeoutChecks.get() )
+        boolean localIsConnected = isConnected.get();
+        if ( !localIsConnected )
         {
-            boolean localIsConnected = isConnected.get();
-            if ( !localIsConnected )
-            {
-                checkTimeouts();
-            }
+            checkTimeouts();
         }
 
         return zooKeeper.getZooKeeper();
@@ -194,35 +189,57 @@ class ConnectionState implements Watcher, Closeable
 
     private synchronized void checkTimeouts() throws Exception
     {
-        int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
-        long elapsed = System.currentTimeMillis() - connectionStartMs;
-        if ( elapsed >= minTimeout )
+        Callable<Boolean> hasNewConnectionString  = new Callable<Boolean>()
+        {
+            @Override
+            public Boolean call()
+            {
+                return zooKeeper.hasNewConnectionString();
+            }
+        };
+        ConnectionHandlingPolicy.CheckTimeoutsResult result = 
connectionHandlingPolicy.checkTimeouts(hasNewConnectionString, 
connectionStartMs, sessionTimeoutMs, connectionTimeoutMs);
+        switch ( result )
         {
-            if ( zooKeeper.hasNewConnectionString() )
+            default:
+            case NOP:
+            {
+                break;
+            }
+
+            case NEW_CONNECTION_STRING:
             {
                 handleNewConnectionString();
+                break;
             }
-            else
+
+            case RESET_CONNECTION:
             {
-                int maxTimeout = Math.max(sessionTimeoutMs, 
connectionTimeoutMs);
-                if ( elapsed > maxTimeout )
+                if ( 
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
                 {
-                    if ( 
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
-                    {
-                        log.warn(String.format("Connection attempt 
unsuccessful after %d (greater than max timeout of %d). Resetting connection 
and trying again with a new connection.", elapsed, maxTimeout));
-                    }
-                    reset();
+                    long elapsed = System.currentTimeMillis() - 
connectionStartMs;
+                    int maxTimeout = Math.max(sessionTimeoutMs, 
connectionTimeoutMs);
+                    log.warn(String.format("Connection attempt unsuccessful 
after %d (greater than max timeout of %d). Resetting connection and trying 
again with a new connection.", elapsed, maxTimeout));
                 }
-                else
+                reset();
+                break;
+            }
+
+            case CONNECTION_TIMEOUT:
+            {
+                KeeperException.ConnectionLossException 
connectionLossException = new CuratorConnectionLossException();
+                if ( 
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
                 {
-                    KeeperException.ConnectionLossException 
connectionLossException = new CuratorConnectionLossException();
-                    if ( 
!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
-                    {
-                        log.error(String.format("Connection timed out for 
connection string (%s) and timeout (%d) / elapsed (%d)", 
zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), 
connectionLossException);
-                    }
-                    tracer.get().addCount("connections-timed-out", 1);
-                    throw connectionLossException;
+                    long elapsed = System.currentTimeMillis() - 
connectionStartMs;
+                    log.error(String.format("Connection timed out for 
connection string (%s) and timeout (%d) / elapsed (%d)", 
zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), 
connectionLossException);
                 }
+                tracer.get().addCount("connections-timed-out", 1);
+                throw connectionLossException;
+            }
+
+            case SESSION_TIMEOUT:
+            {
+                handleExpiredSession();
+                break;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java 
b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index a065d78..9342acf 100644
--- 
a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ 
b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -21,6 +21,8 @@ package org.apache.curator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.curator.connection.ClassicConnectionHandlingPolicy;
+import org.apache.curator.connection.ConnectionHandlingPolicy;
 import org.apache.curator.drivers.TracerDriver;
 import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
@@ -51,7 +53,7 @@ public class CuratorZookeeperClient implements Closeable
     private final int connectionTimeoutMs;
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final AtomicReference<TracerDriver> tracer = new 
AtomicReference<TracerDriver>(new DefaultTracerDriver());
-    private final boolean manageTimeouts;
+    private final ConnectionHandlingPolicy connectionHandlingPolicy;
     private final AtomicReference<Exception> debugException = new 
AtomicReference<>();
 
     /**
@@ -64,7 +66,7 @@ public class CuratorZookeeperClient implements Closeable
      */
     public CuratorZookeeperClient(String connectString, int sessionTimeoutMs, 
int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy)
     {
-        this(new DefaultZookeeperFactory(), new 
FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, 
watcher, retryPolicy, false, true);
+        this(new DefaultZookeeperFactory(), new 
FixedEnsembleProvider(connectString), sessionTimeoutMs, connectionTimeoutMs, 
watcher, retryPolicy, false, new ClassicConnectionHandlingPolicy());
     }
 
     /**
@@ -76,7 +78,7 @@ public class CuratorZookeeperClient implements Closeable
      */
     public CuratorZookeeperClient(EnsembleProvider ensembleProvider, int 
sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy 
retryPolicy)
     {
-        this(new DefaultZookeeperFactory(), ensembleProvider, 
sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, true);
+        this(new DefaultZookeeperFactory(), ensembleProvider, 
sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, false, new 
ClassicConnectionHandlingPolicy());
     }
 
     /**
@@ -93,7 +95,7 @@ public class CuratorZookeeperClient implements Closeable
      */
     public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, 
EnsembleProvider ensembleProvider, int sessionTimeoutMs, int 
connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean 
canBeReadOnly)
     {
-        this(new DefaultZookeeperFactory(), ensembleProvider, 
sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, canBeReadOnly, 
true);
+        this(new DefaultZookeeperFactory(), ensembleProvider, 
sessionTimeoutMs, connectionTimeoutMs, watcher, retryPolicy, canBeReadOnly, new 
ClassicConnectionHandlingPolicy());
     }
 
     /**
@@ -107,11 +109,12 @@ public class CuratorZookeeperClient implements Closeable
      *                      read only mode in case of a network partition. See
      *                      {@link ZooKeeper#ZooKeeper(String, int, Watcher, 
long, byte[], boolean)}
      *                      for details
-     * @param manageTimeouts in general, Curator clients try to manage 
session/connection timeouts. If this is false, that management is turned off
+     * @param connectionHandlingPolicy connection handling policy - use one of 
the pre-defined policies or write your own
+     * @since 3.0.0
      */
-    public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, 
EnsembleProvider ensembleProvider, int sessionTimeoutMs, int 
connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean 
canBeReadOnly, boolean manageTimeouts)
+    public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, 
EnsembleProvider ensembleProvider, int sessionTimeoutMs, int 
connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean 
canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
     {
-        this.manageTimeouts = manageTimeouts;
+        this.connectionHandlingPolicy = connectionHandlingPolicy;
         if ( sessionTimeoutMs < connectionTimeoutMs )
         {
             log.warn(String.format("session timeout [%d] is less than 
connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
@@ -121,11 +124,7 @@ public class CuratorZookeeperClient implements Closeable
         ensembleProvider = Preconditions.checkNotNull(ensembleProvider, 
"ensembleProvider cannot be null");
 
         this.connectionTimeoutMs = connectionTimeoutMs;
-        state = new ConnectionState(zookeeperFactory, ensembleProvider, 
sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly);
-        if ( !manageTimeouts )
-        {
-            state.disableTimeoutChecks();
-        }
+        state = new ConnectionState(zookeeperFactory, ensembleProvider, 
sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, 
connectionHandlingPolicy);
         setRetryPolicy(retryPolicy);
     }
 
@@ -328,14 +327,13 @@ public class CuratorZookeeperClient implements Closeable
     }
 
     /**
-     * Returns true if connection timeouts should cause the retry policy to be 
checked. If false
-     * is returned, throw a connection exception without retrying
+     * Return the configured connection handling policy
      *
-     * @return true/false
+     * @return ConnectionHandlingPolicy
      */
-    public boolean retryConnectionTimeouts()
+    public ConnectionHandlingPolicy getConnectionHandlingPolicy()
     {
-        return manageTimeouts;
+        return connectionHandlingPolicy;
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/RetryLoop.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/RetryLoop.java 
b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
index 92291c1..35d55a1 100644
--- a/curator-client/src/main/java/org/apache/curator/RetryLoop.java
+++ b/curator-client/src/main/java/org/apache/curator/RetryLoop.java
@@ -110,14 +110,30 @@ public class RetryLoop
                 }
 
                 client.internalBlockUntilConnectedOrTimedOut();
-                if ( !client.isConnected() && 
!client.retryConnectionTimeouts() )
+
+                switch ( client.getConnectionHandlingPolicy().preRetry(client) 
)
                 {
-                    connectionFailed = true;
-                    break;
+                    default:
+                    case CALL_PROC:
+                    {
+                        result = proc.call();
+                        retryLoop.markComplete();
+                        break;
+                    }
+
+                    case EXIT_RETRIES:
+                    {
+                        retryLoop.markComplete();
+                        break;
+                    }
+
+                    case CONNECTION_TIMEOUT:
+                    {
+                        connectionFailed = true;
+                        retryLoop.markComplete();
+                        break;
+                    }
                 }
-
-                result = proc.call();
-                retryLoop.markComplete();
             }
             catch ( Exception e )
             {

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
 
b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
new file mode 100644
index 0000000..71dc065
--- /dev/null
+++ 
b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
@@ -0,0 +1,48 @@
+package org.apache.curator.connection;
+
+import org.apache.curator.CuratorZookeeperClient;
+import java.util.concurrent.Callable;
+
+public class ClassicConnectionHandlingPolicy implements 
ConnectionHandlingPolicy
+{
+    @Override
+    public boolean isEmulatingClassicHandling()
+    {
+        return true;
+    }
+
+    @Override
+    public CheckTimeoutsResult checkTimeouts(Callable<Boolean> 
hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int 
connectionTimeoutMs) throws Exception
+    {
+        CheckTimeoutsResult result = CheckTimeoutsResult.NOP;
+        int minTimeout = Math.min(sessionTimeoutMs, connectionTimeoutMs);
+        long elapsed = System.currentTimeMillis() - connectionStartMs;
+        if ( elapsed >= minTimeout )
+        {
+            if ( hasNewConnectionString.call() )
+            {
+                result = CheckTimeoutsResult.NEW_CONNECTION_STRING;
+            }
+            else
+            {
+                int maxTimeout = Math.max(sessionTimeoutMs, 
connectionTimeoutMs);
+                if ( elapsed > maxTimeout )
+                {
+                    result = CheckTimeoutsResult.RESET_CONNECTION;
+                }
+                else
+                {
+                    result = CheckTimeoutsResult.CONNECTION_TIMEOUT;
+                }
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public PreRetryResult preRetry(CuratorZookeeperClient client) throws 
Exception
+    {
+        return PreRetryResult.CALL_PROC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
 
b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
new file mode 100644
index 0000000..f3ecce6
--- /dev/null
+++ 
b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
@@ -0,0 +1,84 @@
+package org.apache.curator.connection;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.zookeeper.KeeperException;
+import java.util.concurrent.Callable;
+
+public interface ConnectionHandlingPolicy
+{
+    /**
+     * Return true if this policy should behave like the pre-3.0.0 version of 
Curator
+     *
+     * @return true/false
+     */
+    boolean isEmulatingClassicHandling();
+
+    enum CheckTimeoutsResult
+    {
+        /**
+         * Do nothing
+         */
+        NOP,
+
+        /**
+         * handle a new connection string
+         */
+        NEW_CONNECTION_STRING,
+
+        /**
+         * reset/recreate the internal ZooKeeper connection
+         */
+        RESET_CONNECTION,
+
+        /**
+         * handle a connection timeout
+         */
+        CONNECTION_TIMEOUT,
+
+        /**
+         * handle a session timeout
+         */
+        SESSION_TIMEOUT
+    }
+
+    /**
+     * Check timeouts. NOTE: this method is only called when an attempt to 
access to the ZooKeeper instances
+     * is made and the connection has not completed.
+     *
+     * @param hasNewConnectionString proc to call to check if there is a new 
connection string. Important: the internal state is cleared after
+     *                               this is called so you MUST handle the new 
connection string if <tt>true</tt> is returned
+     * @param connectionStartMs the epoch/ms time that the connection was 
first initiated
+     * @param sessionTimeoutMs the configured session timeout in milliseconds
+     * @param connectionTimeoutMs the configured connection timeout in 
milliseconds
+     * @return result
+     * @throws Exception errors
+     */
+    CheckTimeoutsResult checkTimeouts(Callable<Boolean> 
hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int 
connectionTimeoutMs) throws Exception;
+
+    enum PreRetryResult
+    {
+        /**
+         * The retry loop should call the procedure
+         */
+        CALL_PROC,
+
+        /**
+         * Do not call the procedure and exit the retry loop
+         */
+        EXIT_RETRIES,
+
+        /**
+         * Do not call the procedure and throw {@link 
KeeperException.ConnectionLossException}
+         */
+        CONNECTION_TIMEOUT
+    }
+
+    /**
+     * Called prior to each iteration of a procedure in a retry loop
+     *
+     * @param client the client
+     * @return result
+     * @throws Exception errors
+     */
+    PreRetryResult preRetry(CuratorZookeeperClient client) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
 
b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
new file mode 100644
index 0000000..06285ca
--- /dev/null
+++ 
b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
@@ -0,0 +1,35 @@
+package org.apache.curator.connection;
+
+import org.apache.curator.CuratorZookeeperClient;
+import java.util.concurrent.Callable;
+
+public class StandardConnectionHandlingPolicy implements 
ConnectionHandlingPolicy
+{
+    @Override
+    public boolean isEmulatingClassicHandling()
+    {
+        return false;
+    }
+
+    @Override
+    public CheckTimeoutsResult checkTimeouts(Callable<Boolean> 
hasNewConnectionString, long connectionStartMs, int sessionTimeoutMs, int 
connectionTimeoutMs) throws Exception
+    {
+        if ( hasNewConnectionString.call() )
+        {
+            return CheckTimeoutsResult.NEW_CONNECTION_STRING;
+        }
+        return CheckTimeoutsResult.NOP;
+    }
+
+    @Override
+    public PreRetryResult preRetry(CuratorZookeeperClient client) throws 
Exception
+    {
+        // TODO - see if there are other servers to connect to
+        if ( !client.isConnected() )
+        {
+            return PreRetryResult.CONNECTION_TIMEOUT;
+        }
+
+        return PreRetryResult.CALL_PROC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java 
b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
index 871e4af..59c30ac 100644
--- a/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
+++ b/curator-client/src/test/java/org/apache/curator/TestEnsurePath.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.curator.connection.ClassicConnectionHandlingPolicy;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.ZooKeeper;
@@ -51,7 +52,7 @@ public class TestEnsurePath
         CuratorZookeeperClient  curator = mock(CuratorZookeeperClient.class);
         RetryPolicy             retryPolicy = new RetryOneTime(1);
         RetryLoop               retryLoop = new RetryLoop(retryPolicy, null);
-        when(curator.retryConnectionTimeouts()).thenReturn(true);
+        when(curator.getConnectionHandlingPolicy()).thenReturn(new 
ClassicConnectionHandlingPolicy());
         when(curator.getZooKeeper()).thenReturn(client);
         when(curator.getRetryPolicy()).thenReturn(retryPolicy);
         when(curator.newRetryLoop()).thenReturn(retryLoop);
@@ -77,7 +78,7 @@ public class TestEnsurePath
         RetryPolicy             retryPolicy = new RetryOneTime(1);
         RetryLoop               retryLoop = new RetryLoop(retryPolicy, null);
         final CuratorZookeeperClient  curator = 
mock(CuratorZookeeperClient.class);
-        when(curator.retryConnectionTimeouts()).thenReturn(true);
+        when(curator.getConnectionHandlingPolicy()).thenReturn(new 
ClassicConnectionHandlingPolicy());
         when(curator.getZooKeeper()).thenReturn(client);
         when(curator.getRetryPolicy()).thenReturn(retryPolicy);
         when(curator.newRetryLoop()).thenReturn(retryLoop);

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index fad4fc2..01a8666 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -21,6 +21,9 @@ package org.apache.curator.framework;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.curator.RetryPolicy;
+import org.apache.curator.connection.ClassicConnectionHandlingPolicy;
+import org.apache.curator.connection.ConnectionHandlingPolicy;
+import org.apache.curator.connection.StandardConnectionHandlingPolicy;
 import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
 import org.apache.curator.framework.api.ACLProvider;
@@ -35,6 +38,7 @@ import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.utils.DefaultZookeeperFactory;
 import org.apache.curator.utils.ZookeeperFactory;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import java.net.InetAddress;
@@ -117,7 +121,7 @@ public class CuratorFrameworkFactory
         private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
         private boolean canBeReadOnly = false;
         private boolean useContainerParentsIfAvailable = true;
-        private boolean enableSessionExpiredState = 
Boolean.getBoolean("curator-enable-session-expired-state");
+        private ConnectionHandlingPolicy connectionHandlingPolicy = 
Boolean.getBoolean("curator-use-classic-connection-handling") ? new 
ClassicConnectionHandlingPolicy() : new StandardConnectionHandlingPolicy();
 
         /**
          * Apply the current values and build a new CuratorFramework
@@ -346,14 +350,50 @@ public class CuratorFrameworkFactory
         }
 
         /**
-         * Changes the meaning of {@link ConnectionState#LOST} from it's pre 
Curator 3.0.0 meaning
-         * to a true lost session state. See the {@link ConnectionState#LOST} 
doc for details.
+         * <p>
+         *     Change the connection handling policy. The default policy is 
{@link StandardConnectionHandlingPolicy}.
+         * </p>
+         * <p>
+         *     <strong>IMPORTANT: </strong> StandardConnectionHandlingPolicy 
has different behavior than the connection
+         *     policy handling prior to version 3.0.0. You can specify that 
the connection handling be the method
+         *     prior to 3.0.0 by passing in an instance of {@link 
ClassicConnectionHandlingPolicy} here or by
+         *     setting the command line value 
"curator-use-classic-connection-handling" to true (e.g. 
<tt>-Dcurator-use-classic-connection-handling=true</tt>).
+         * </p>
+         * <p>
+         *     Major differences from the older behavior are:
+         * </p>
+         * <ul>
+         *     <li>
+         *         Session/connection timeouts are no longer managed by the 
low-level client. They are managed
+         *         by the CuratorFramework instance. There should be no 
noticeable differences.
+         *     </li>
+         *     <li>
+         *         Prior to 3.0.0, an elapsed connection timeout would be 
presented to the retry policy, possibly
+         *         causing retries. Now, elapsed connection timeouts are only 
retried if there is an another server
+         *         in the connection string. i.e. a new instance will be 
retried should the retry policy allow a retry.
+         *         If no other servers remain, a {@link 
KeeperException.ConnectionLossException} is thrown immediately
+         *         without notifying the retry policy.
+         *     </li>
+         *     <li>
+         *         <strong>MOST IMPORTANTLY!</strong> Prior to 3.0.0, {@link 
ConnectionState#LOST} did not imply
+         *         a lost session (much to the confusion of users). Now,
+         *         Curator will set the LOST state only when it believes that 
the ZooKeeper session
+         *         has expired. ZooKeeper connections have a session. When the 
session expires, clients must take appropriate
+         *         action. In Curator, this is complicated by the fact that 
Curator internally manages the ZooKeeper
+         *         connection. Now, Curator will set the LOST state when any 
of the following occurs:
+         *         a) ZooKeeper returns a {@link 
Watcher.Event.KeeperState#Expired} or {@link 
KeeperException.Code#SESSIONEXPIRED};
+         *         b) Curator closes the internally managed ZooKeeper 
instance; c) The configured session timeout
+         *         elapses during a network partition.
+         *     </li>
+         * </ul>
          *
+         * @param connectionHandlingPolicy the policy
          * @return this
+         * @since 3.0.0
          */
-        public Builder enableSessionExpiredState()
+        public Builder connectionHandlingPolicy(ConnectionHandlingPolicy 
connectionHandlingPolicy)
         {
-            this.enableSessionExpiredState = true;
+            this.connectionHandlingPolicy = connectionHandlingPolicy;
             return this;
         }
 
@@ -412,9 +452,9 @@ public class CuratorFrameworkFactory
             return useContainerParentsIfAvailable;
         }
 
-        public boolean getEnableSessionExpiredState()
+        public ConnectionHandlingPolicy getConnectionHandlingPolicy()
         {
-            return enableSessionExpiredState;
+            return connectionHandlingPolicy;
         }
 
         @Deprecated

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index bcbeecd..44a8ec6 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -84,7 +84,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final NamespaceFacadeCache namespaceFacadeCache;
     private final NamespaceWatcherMap namespaceWatcherMap = new 
NamespaceWatcherMap(this);
     private final boolean useContainerParentsIfAvailable;
-    private final boolean enableSessionExpiredState;
     private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
 
     private volatile ExecutorService executorService;
@@ -107,24 +106,24 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
     {
         ZookeeperFactory localZookeeperFactory = 
makeZookeeperFactory(builder.getZookeeperFactory());
         this.client = new CuratorZookeeperClient
-        (
-            localZookeeperFactory,
-            builder.getEnsembleProvider(),
-            builder.getSessionTimeoutMs(),
-            builder.getConnectionTimeoutMs(),
-            new Watcher()
-            {
-                @Override
-                public void process(WatchedEvent watchedEvent)
+            (
+                localZookeeperFactory,
+                builder.getEnsembleProvider(),
+                builder.getSessionTimeoutMs(),
+                builder.getConnectionTimeoutMs(),
+                new Watcher()
                 {
-                    CuratorEvent event = new 
CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, 
watchedEvent.getState().getIntValue(), 
unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, 
watchedEvent, null, null);
-                    processEvent(event);
-                }
-            },
-            builder.getRetryPolicy(),
-            builder.canBeReadOnly(),
-            !builder.getEnableSessionExpiredState() // inverse is correct 
here. By default, CuratorZookeeperClient manages timeouts. The new 
SessionExpiredState needs this disabled.
-        );
+                    @Override
+                    public void process(WatchedEvent watchedEvent)
+                    {
+                        CuratorEvent event = new 
CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, 
watchedEvent.getState().getIntValue(), 
unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, 
watchedEvent, null, null);
+                        processEvent(event);
+                    }
+                },
+                builder.getRetryPolicy(),
+                builder.canBeReadOnly(),
+                builder.getConnectionHandlingPolicy()
+            );
 
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new 
ListenerContainer<UnhandledErrorListener>();
@@ -132,12 +131,11 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         maxCloseWaitMs = builder.getMaxCloseWaitMs();
-        connectionStateManager = new ConnectionStateManager(this, 
builder.getThreadFactory(), builder.getEnableSessionExpiredState(), 
builder.getSessionTimeoutMs());
+        connectionStateManager = new ConnectionStateManager(this, 
builder.getThreadFactory(), builder.getSessionTimeoutMs());
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new 
AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
         useContainerParentsIfAvailable = 
builder.useContainerParentsIfAvailable();
-        enableSessionExpiredState = builder.getEnableSessionExpiredState();
 
         byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? 
Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
@@ -211,7 +209,6 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         state = parent.state;
         authInfos = parent.authInfos;
         useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
-        enableSessionExpiredState = parent.enableSessionExpiredState;
     }
 
     @Override
@@ -699,7 +696,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
 
     private void checkNewConnection()
     {
-        if ( enableSessionExpiredState )
+        if ( 
!client.getConnectionHandlingPolicy().isEmulatingClassicHandling() )
         {
             long instanceIndex = client.getInstanceIndex();
             long newInstanceIndex = 
currentInstanceIndex.getAndSet(instanceIndex);
@@ -752,7 +749,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
             return;
         }
 
-        if ( !enableSessionExpiredState )
+        if ( client.getConnectionHandlingPolicy().isEmulatingClassicHandling() 
)
         {
             doSyncForSuspendedConnection(client.getInstanceIndex());
         }

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 79f3b62..fe40abf 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -64,32 +64,18 @@ public enum ConnectionState
 
     /**
      * <p>
-     *     NOTE: the meaning of this state depends on how your 
CuratorFramework instance
-     *     is created.
-     * </p>
-     *
-     * <p>
-     *     The default meaning of LOST (and the only meaning prior to Curator 
3.0.0) is:
-     *     The connection is confirmed to be lost (i.e. the retry policy has 
given up). Close any locks, leaders, etc. and
-     *     attempt to re-create them. NOTE: it is possible to get a {@link 
#RECONNECTED}
-     *     state after this but you should still consider any locks, etc. as 
dirty/unstable
-     * </p>
-     *
-     * <p>
-     *     <strong>Since 3.0.0</strong>, you can alter the meaning of LOST by 
calling
-     *     {@link 
CuratorFrameworkFactory.Builder#enableSessionExpiredState()}. In this mode,
      *     Curator will set the LOST state only when it believes that the 
ZooKeeper session
      *     has expired. ZooKeeper connections have a session. When the session 
expires, clients must take appropriate
      *     action. In Curator, this is complicated by the fact that Curator 
internally manages the ZooKeeper
-     *     connection. In this mode, Curator will set the LOST state when any 
of the following occurs:
+     *     connection. Curator will set the LOST state when any of the 
following occurs:
      *     a) ZooKeeper returns a {@link Watcher.Event.KeeperState#Expired} or 
{@link KeeperException.Code#SESSIONEXPIRED};
      *     b) Curator closes the internally managed ZooKeeper instance; c) The 
configured session timeout
      *     elapses during a network partition.
      * </p>
      *
      * <p>
-     *     NOTE: the new behavior for the LOST state can also be enabled via 
the command line
-     *     property "curator-enable-session-expired-state" (e.g. 
-Dcurator-enable-session-expired-state=true)
+     *     NOTE: see {@link 
CuratorFrameworkFactory.Builder#connectionHandlingPolicy} for an important note 
about a
+     *     change in meaning to LOST since 3.0.0
      * </p>
      */
     LOST

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 52e0d07..2e7492f 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.state;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import org.apache.curator.connection.ConnectionHandlingPolicyStyle;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.utils.ThreadUtils;
@@ -35,7 +36,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -66,7 +66,6 @@ public class ConnectionStateManager implements Closeable
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final BlockingQueue<ConnectionState> eventQueue = new 
ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
     private final CuratorFramework client;
-    private final boolean enableSessionExpiredState;
     private final int sessionTimeoutMs;
     private final ListenerContainer<ConnectionStateListener> listeners = new 
ListenerContainer<ConnectionStateListener>();
     private final AtomicBoolean initialConnectMessageSent = new 
AtomicBoolean(false);
@@ -88,13 +87,11 @@ public class ConnectionStateManager implements Closeable
     /**
      * @param client        the client
      * @param threadFactory thread factory to use or null for a default
-     * @param enableSessionExpiredState if true, applies new meaning for LOST 
as described here: {@link ConnectionState#LOST}
      * @param sessionTimeoutMs the ZK session timeout in milliseconds
      */
-    public ConnectionStateManager(CuratorFramework client, ThreadFactory 
threadFactory, boolean enableSessionExpiredState, int sessionTimeoutMs)
+    public ConnectionStateManager(CuratorFramework client, ThreadFactory 
threadFactory, int sessionTimeoutMs)
     {
         this.client = client;
-        this.enableSessionExpiredState = enableSessionExpiredState;
         this.sessionTimeoutMs = sessionTimeoutMs;
         if ( threadFactory == null )
         {
@@ -273,7 +270,7 @@ public class ConnectionStateManager implements Closeable
                             }
                         );
                 }
-                else if ( enableSessionExpiredState )
+                else if ( 
!client.getZookeeperClient().getConnectionHandlingPolicy().isEmulatingClassicHandling()
 )
                 {
                     synchronized(this)
                     {

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
index cd415b1..4d6f473 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestEnabledSessionExpiredState.java
@@ -53,7 +53,6 @@ public class TestEnabledSessionExpiredState extends 
BaseClassForTests
             .connectString(server.getConnectString())
             .connectionTimeoutMs(timing.connection())
             .sessionTimeoutMs(timing.session())
-            .enableSessionExpiredState()
             .retryPolicy(new RetryOneTime(1))
             .build();
         client.start();
@@ -115,7 +114,7 @@ public class TestEnabledSessionExpiredState extends 
BaseClassForTests
         Assert.assertEquals(states.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED);
         server.stop();
         Assert.assertEquals(states.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
-        Assert.assertEquals(states.poll(timing.multiple(2).session(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST);
+        Assert.assertEquals(states.poll(timing.sessionSleep(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST);
     }
 
     @Test
@@ -125,7 +124,7 @@ public class TestEnabledSessionExpiredState extends 
BaseClassForTests
         server.stop();
         timing.sleepForSession();
         Assert.assertEquals(states.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED);
-        Assert.assertEquals(states.poll(timing.multiple(2).session(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST);
+        Assert.assertEquals(states.poll(timing.sessionSleep(), 
TimeUnit.MILLISECONDS), ConnectionState.LOST);
         server.restart();
         client.checkExists().forPath("/");
         Assert.assertEquals(states.poll(timing.milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED);

http://git-wip-us.apache.org/repos/asf/curator/blob/e2391370/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
----------------------------------------------------------------------
diff --git 
a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java 
b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index c9f3524..1f6503d 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -86,14 +86,14 @@ public class BaseClassForTests
                 public void beforeInvocation(IInvokedMethod method, 
ITestResult testResult)
                 {
                     int invocationCount = 
method.getTestMethod().getCurrentInvocationCount();
-                    System.setProperty("curator-enable-session-expired-state", 
Boolean.toString(invocationCount == 1));
-                    log.info("curator-enable-session-expired-state: " + 
Boolean.toString(invocationCount == 1));
+                    
System.setProperty("curator-use-classic-connection-handling", 
Boolean.toString(invocationCount == 1));
+                    log.info("curator-use-classic-connection-handling: " + 
Boolean.toString(invocationCount == 1));
                 }
 
                 @Override
                 public void afterInvocation(IInvokedMethod method, ITestResult 
testResult)
                 {
-                    
System.clearProperty("curator-enable-session-expired-state");
+                    
System.clearProperty("curator-use-classic-connection-handling");
                 }
             };
             context.getSuite().addListener(listener);

Reply via email to