The true session timeout is a negotiated value between client and server. The 
new session timeout handling should use the negotiated value if available


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/966b8dfc
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/966b8dfc
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/966b8dfc

Branch: refs/heads/CURATOR-248
Commit: 966b8dfc8897f766f70a2334c90fa0913e06f996
Parents: 9c7cf5d
Author: randgalt <randg...@apache.org>
Authored: Sun Aug 23 20:31:21 2015 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sun Aug 23 20:31:21 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/curator/ConnectionState.java | 12 ++++++++++++
 .../apache/curator/CuratorZookeeperClient.java   | 10 ++++++++++
 .../java/org/apache/curator/HandleHolder.java    | 19 +++++++++++++++++++
 .../framework/state/ConnectionStateManager.java  |  5 +++--
 4 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/966b8dfc/curator-client/src/main/java/org/apache/curator/ConnectionState.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/ConnectionState.java 
b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
index d6ddd33..555a52d 100644
--- a/curator-client/src/main/java/org/apache/curator/ConnectionState.java
+++ b/curator-client/src/main/java/org/apache/curator/ConnectionState.java
@@ -36,6 +36,7 @@ import java.util.Queue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -46,6 +47,7 @@ class ConnectionState implements Watcher, Closeable
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final HandleHolder zooKeeper;
     private final AtomicBoolean isConnected = new AtomicBoolean(false);
+    private final AtomicInteger lastNegotiatedSessionTimeoutMs = new 
AtomicInteger(0);
     private final EnsembleProvider ensembleProvider;
     private final int sessionTimeoutMs;
     private final int connectionTimeoutMs;
@@ -141,6 +143,11 @@ class ConnectionState implements Watcher, Closeable
         return instanceIndex.get();
     }
 
+    int getLastNegotiatedSessionTimeoutMs()
+    {
+        return lastNegotiatedSessionTimeoutMs.get();
+    }
+
     @Override
     public void process(WatchedEvent event)
     {
@@ -167,6 +174,11 @@ class ConnectionState implements Watcher, Closeable
         {
             isConnected.set(newIsConnected);
             connectionStartMs = System.currentTimeMillis();
+            if ( newIsConnected )
+            {
+                
lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
+                log.debug("Negotiated session timeout: " + 
lastNegotiatedSessionTimeoutMs.get());
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/966b8dfc/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java 
b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
index d6c2072..471adf0 100644
--- 
a/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
+++ 
b/curator-client/src/main/java/org/apache/curator/CuratorZookeeperClient.java
@@ -334,6 +334,16 @@ public class CuratorZookeeperClient implements Closeable
         return connectionHandlingPolicy;
     }
 
+    /**
+     * Return the most recent value of {@link ZooKeeper#getSessionTimeout()} 
or 0
+     *
+     * @return session timeout or 0
+     */
+    public int getLastNegotiatedSessionTimeoutMs()
+    {
+        return state.getLastNegotiatedSessionTimeoutMs();
+    }
+
     void addParentWatcher(Watcher watcher)
     {
         state.addParentWatcher(watcher);

http://git-wip-us.apache.org/repos/asf/curator/blob/966b8dfc/curator-client/src/main/java/org/apache/curator/HandleHolder.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/HandleHolder.java 
b/curator-client/src/main/java/org/apache/curator/HandleHolder.java
index 1f7cd91..8652f0c 100644
--- a/curator-client/src/main/java/org/apache/curator/HandleHolder.java
+++ b/curator-client/src/main/java/org/apache/curator/HandleHolder.java
@@ -39,6 +39,8 @@ class HandleHolder
         ZooKeeper getZooKeeper() throws Exception;
         
         String getConnectionString();
+
+        int getNegotiatedSessionTimeoutMs();
     }
 
     HandleHolder(ZookeeperFactory zookeeperFactory, Watcher watcher, 
EnsembleProvider ensembleProvider, int sessionTimeout, boolean canBeReadOnly)
@@ -55,6 +57,11 @@ class HandleHolder
         return (helper != null) ? helper.getZooKeeper() : null;
     }
 
+    int getNegotiatedSessionTimeoutMs()
+    {
+        return (helper != null) ? helper.getNegotiatedSessionTimeoutMs() : 0;
+    }
+
     String  getConnectionString()
     {
         return (helper != null) ? helper.getConnectionString() : null;
@@ -107,6 +114,12 @@ class HandleHolder
                         {
                             return connectionString;
                         }
+
+                        @Override
+                        public int getNegotiatedSessionTimeoutMs()
+                        {
+                            return (zooKeeperHandle != null) ? 
zooKeeperHandle.getSessionTimeout() : 0;
+                        }
                     };
 
                     return zooKeeperHandle;
@@ -118,6 +131,12 @@ class HandleHolder
             {
                 return connectionString;
             }
+
+            @Override
+            public int getNegotiatedSessionTimeoutMs()
+            {
+                return (zooKeeperHandle != null) ? 
zooKeeperHandle.getSessionTimeout() : 0;
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/966b8dfc/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 44d511b..3da7534 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -292,9 +292,10 @@ public class ConnectionStateManager implements Closeable
         if ( (currentConnectionState == ConnectionState.SUSPENDED) && 
(startOfSuspendedEpoch != 0) )
         {
             long elapsedMs = System.currentTimeMillis() - 
startOfSuspendedEpoch;
-            if ( elapsedMs >= sessionTimeoutMs )
+            int useSessionTimeoutMs = 
Math.max(client.getZookeeperClient().getLastNegotiatedSessionTimeoutMs(), 
sessionTimeoutMs);
+            if ( elapsedMs >= useSessionTimeoutMs )
             {
-                log.warn(String.format("Session timeout has elapsed while 
SUSPENDED. Posting LOST event and resetting the connection. Elapsed ms: %d", 
elapsedMs));
+                log.warn(String.format("Session timeout has elapsed while 
SUSPENDED. Posting LOST event and resetting the connection. Elapsed ms: %d. 
Session Timeout ms: %d", elapsedMs, useSessionTimeoutMs));
                 try
                 {
                     client.getZookeeperClient().reset();

Reply via email to