Repository: curator
Updated Branches:
  refs/heads/CURATOR-3.0 24de710d2 -> 64bb8841a


Added an option for session expiration management to be a fraction of the 
negotiated session timeout. This is meant to account for timing/network 
differences between the client and server.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/64bb8841
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/64bb8841
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/64bb8841

Branch: refs/heads/CURATOR-3.0
Commit: 64bb8841a39e1d82de091d23d1865ba47236e5ad
Parents: 24de710
Author: randgalt <randg...@apache.org>
Authored: Mon Sep 14 21:04:36 2015 -0500
Committer: randgalt <randg...@apache.org>
Committed: Mon Sep 14 21:04:36 2015 -0500

----------------------------------------------------------------------
 .../ClassicConnectionHandlingPolicy.java        |  4 +--
 .../connection/ConnectionHandlingPolicy.java    | 27 +++++++++++++++++---
 .../StandardConnectionHandlingPolicy.java       | 17 ++++++++++--
 .../imps/ClassicInternalConnectionHandler.java  |  6 -----
 .../framework/imps/CuratorFrameworkImpl.java    |  7 ++---
 .../imps/InternalConnectionHandler.java         |  2 --
 .../imps/StandardInternalConnectionHandler.java |  6 -----
 .../framework/state/ConnectionStateManager.java | 13 +++++-----
 8 files changed, 52 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
 
b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
index e4c59f4..8116308 100644
--- 
a/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
+++ 
b/curator-client/src/main/java/org/apache/curator/connection/ClassicConnectionHandlingPolicy.java
@@ -28,9 +28,9 @@ import java.util.concurrent.Callable;
 public class ClassicConnectionHandlingPolicy implements 
ConnectionHandlingPolicy
 {
     @Override
-    public boolean isEmulatingClassicHandling()
+    public int getSimulatedSessionExpirationPercent()
     {
-        return true;
+        return 0;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
 
b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
index ae77861..c47577d 100644
--- 
a/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
+++ 
b/curator-client/src/main/java/org/apache/curator/connection/ConnectionHandlingPolicy.java
@@ -29,11 +29,32 @@ import java.util.concurrent.Callable;
 public interface ConnectionHandlingPolicy
 {
     /**
-     * Return true if this policy should behave like the pre-3.0.0 version of 
Curator
+     * <p>
+     *     Prior to 3.0.0, Curator did not try to manage session expiration
+     *     other than the functionality provided by ZooKeeper itself. Starting 
with
+     *     3.0.0, Curator has the option of attempting to monitor session 
expiration
+     *     above what is provided by ZooKeeper. The percentage returned by 
this method
+     *     determines how and if Curator will check for session expiration.
+     * </p>
      *
-     * @return true/false
+     * <p>
+     *     If this method returns <tt>0</tt>, Curator does not
+     *     do any additional checking for session expiration.
+     * </p>
+     *
+     * <p>
+     *     If a positive number is returned, Curator will check for session 
expiration
+     *     as follows: when ZooKeeper sends a Disconnect event, Curator will 
start a timer.
+     *     If re-connection is not achieved before the elapsed time exceeds 
the negotiated
+     *     session time multiplied by the session expiration percent, Curator 
will simulate
+     *     a session expiration. Due to timing/network issues, it is <b>not 
possible</b> for
+     *     a client to match the server's session timeout with complete 
accuracy. Thus, the need
+     *     for a session expiration percentage.
+     * </p>
+     *
+     * @return a percentage from 0 to 100 (0 implied no extra session checking)
      */
-    boolean isEmulatingClassicHandling();
+    int getSimulatedSessionExpirationPercent();
 
     /**
      * Called by {@link RetryLoop#callWithRetry(CuratorZookeeperClient, 
Callable)} to do the work

http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
 
b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
index ffbc4cb..9f311de 100644
--- 
a/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
+++ 
b/curator-client/src/main/java/org/apache/curator/connection/StandardConnectionHandlingPolicy.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.connection;
 
+import com.google.common.base.Preconditions;
 import org.apache.curator.CuratorZookeeperClient;
 import org.apache.curator.RetryLoop;
 import org.slf4j.Logger;
@@ -32,11 +33,23 @@ import java.util.concurrent.Callable;
 public class StandardConnectionHandlingPolicy implements 
ConnectionHandlingPolicy
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
+    private final int expirationPercent;
+
+    public StandardConnectionHandlingPolicy()
+    {
+        this(100);
+    }
+
+    public StandardConnectionHandlingPolicy(int expirationPercent)
+    {
+        Preconditions.checkArgument((expirationPercent > 0) && 
(expirationPercent <= 100), "expirationPercent must be > 0 and <= 100");
+        this.expirationPercent = expirationPercent;
+    }
 
     @Override
-    public boolean isEmulatingClassicHandling()
+    public int getSimulatedSessionExpirationPercent()
     {
-        return false;
+        return expirationPercent;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
index e2f3c11..63ba665 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
@@ -33,12 +33,6 @@ class ClassicInternalConnectionHandler implements 
InternalConnectionHandler
     }
 
     @Override
-    public boolean checkSessionExpirationEnabled()
-    {
-        return false;
-    }
-
-    @Override
     public void suspendConnection(CuratorFrameworkImpl client)
     {
         if ( client.setToSuspended() )

http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 848ccf2..da9067d 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -38,9 +38,9 @@ import 
org.apache.curator.framework.api.transaction.TransactionOp;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.framework.state.ConnectionStateManager;
-import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
 import org.apache.curator.utils.DebugUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ThreadUtils;
@@ -128,14 +128,15 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
                 builder.getConnectionHandlingPolicy()
             );
 
-        internalConnectionHandler = 
builder.getConnectionHandlingPolicy().isEmulatingClassicHandling() ? new 
ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler();
+        boolean isClassic = 
(builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent() 
== 0);
+        internalConnectionHandler = isClassic ? new 
ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler();
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new 
ListenerContainer<UnhandledErrorListener>();
         backgroundOperations = new DelayQueue<OperationAndData<?>>();
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         maxCloseWaitMs = builder.getMaxCloseWaitMs();
-        connectionStateManager = new ConnectionStateManager(this, 
builder.getThreadFactory(), builder.getSessionTimeoutMs(), 
internalConnectionHandler.checkSessionExpirationEnabled());
+        connectionStateManager = new ConnectionStateManager(this, 
builder.getThreadFactory(), builder.getSessionTimeoutMs(), 
builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent());
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new 
AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);

http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
index 978dced..65669c3 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
@@ -23,6 +23,4 @@ interface InternalConnectionHandler
     void checkNewConnection(CuratorFrameworkImpl client);
 
     void suspendConnection(CuratorFrameworkImpl client);
-
-    boolean checkSessionExpirationEnabled();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
index f600ad0..be0c726 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
@@ -27,12 +27,6 @@ class StandardInternalConnectionHandler implements 
InternalConnectionHandler
     }
 
     @Override
-    public boolean checkSessionExpirationEnabled()
-    {
-        return true;
-    }
-
-    @Override
     public void checkNewConnection(CuratorFrameworkImpl client)
     {
         client.checkInstanceIndex();

http://git-wip-us.apache.org/repos/asf/curator/blob/64bb8841/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 3d44d45..b6f2e02 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -66,7 +66,7 @@ public class ConnectionStateManager implements Closeable
     private final BlockingQueue<ConnectionState> eventQueue = new 
ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
     private final CuratorFramework client;
     private final int sessionTimeoutMs;
-    private final boolean checkSessionExpiration;
+    private final int sessionExpirationPercent;
     private final ListenerContainer<ConnectionStateListener> listeners = new 
ListenerContainer<ConnectionStateListener>();
     private final AtomicBoolean initialConnectMessageSent = new 
AtomicBoolean(false);
     private final ExecutorService service;
@@ -88,13 +88,13 @@ public class ConnectionStateManager implements Closeable
      * @param client        the client
      * @param threadFactory thread factory to use or null for a default
      * @param sessionTimeoutMs the ZK session timeout in milliseconds
-     * @param checkSessionExpiration if true, check for session timeouts, etc. 
ala new connection handling method
+     * @param sessionExpirationPercent percentage of negotiated session 
timeout to use when simulating a session timeout. 0 means don't simulate at all
      */
-    public ConnectionStateManager(CuratorFramework client, ThreadFactory 
threadFactory, int sessionTimeoutMs, boolean checkSessionExpiration)
+    public ConnectionStateManager(CuratorFramework client, ThreadFactory 
threadFactory, int sessionTimeoutMs, int sessionExpirationPercent)
     {
         this.client = client;
         this.sessionTimeoutMs = sessionTimeoutMs;
-        this.checkSessionExpiration = checkSessionExpiration;
+        this.sessionExpirationPercent = sessionExpirationPercent;
         if ( threadFactory == null )
         {
             threadFactory = 
ThreadUtils.newThreadFactory("ConnectionStateManager");
@@ -274,7 +274,7 @@ public class ConnectionStateManager implements Closeable
                             }
                         );
                 }
-                else if ( checkSessionExpiration )
+                else if ( sessionExpirationPercent > 0 )
                 {
                     synchronized(this)
                     {
@@ -296,9 +296,10 @@ public class ConnectionStateManager implements Closeable
             long elapsedMs = System.currentTimeMillis() - 
startOfSuspendedEpoch;
             int lastNegotiatedSessionTimeoutMs = 
client.getZookeeperClient().getLastNegotiatedSessionTimeoutMs();
             int useSessionTimeoutMs = (lastNegotiatedSessionTimeoutMs > 0) ? 
lastNegotiatedSessionTimeoutMs : sessionTimeoutMs;
+            useSessionTimeoutMs = (useSessionTimeoutMs * 
sessionExpirationPercent) / 100;
             if ( elapsedMs >= useSessionTimeoutMs )
             {
-                log.warn(String.format("Session timeout has elapsed while 
SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Session Timeout ms: 
%d", elapsedMs, useSessionTimeoutMs));
+                log.warn(String.format("Session timeout has elapsed while 
SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session 
timeout ms: %d", elapsedMs, useSessionTimeoutMs));
                 try
                 {
                     // LOL - this method was proposed by me (JZ) in 2013 for 
totally unrelated reasons

Reply via email to