First pass at new (optional) definition of state LOST

Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/344634ac
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/344634ac
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/344634ac

Branch: refs/heads/CURATOR-248
Commit: 344634ac6e34e61bc0cc7b41923a1df4089c7948
Parents: 7d97259
Author: randgalt <randg...@apache.org>
Authored: Fri Aug 21 12:10:24 2015 -0500
Committer: randgalt <randg...@apache.org>
Committed: Fri Aug 21 12:10:24 2015 -0500

----------------------------------------------------------------------
 .../framework/CuratorFrameworkFactory.java      | 19 +++++
 .../framework/api/UnhandledErrorListener.java   |  4 +-
 .../framework/imps/CuratorFrameworkImpl.java    | 10 ++-
 .../framework/state/ConnectionState.java        | 35 +++++++--
 .../framework/state/ConnectionStateManager.java | 75 +++++++++++++++-----
 5 files changed, 113 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index dcb2ee6..6209b06 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.CuratorTempFrameworkImpl;
 import org.apache.curator.framework.imps.DefaultACLProvider;
 import org.apache.curator.framework.imps.GzipCompressionProvider;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.utils.DefaultZookeeperFactory;
 import org.apache.curator.utils.ZookeeperFactory;
 import org.apache.zookeeper.CreateMode;
@@ -116,6 +117,7 @@ public class CuratorFrameworkFactory
         private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
         private boolean canBeReadOnly = false;
         private boolean useContainerParentsIfAvailable = true;
+        private boolean enableSessionExpiredState = false;
 
         /**
          * Apply the current values and build a new CuratorFramework
@@ -343,6 +345,18 @@ public class CuratorFrameworkFactory
             return this;
         }
 
+        /**
+         * Changes the meaning of {@link ConnectionState#LOST} from it's pre 
Curator 3.0.0 meaning
+         * to a true lost session state. See the {@link ConnectionState#LOST} 
doc for details.
+         *
+         * @return this
+         */
+        public Builder enableSessionExpiredState()
+        {
+            this.enableSessionExpiredState = true;
+            return this;
+        }
+
         public ACLProvider getAclProvider()
         {
             return aclProvider;
@@ -398,6 +412,11 @@ public class CuratorFrameworkFactory
             return useContainerParentsIfAvailable;
         }
 
+        public boolean getEnableSessionExpiredState()
+        {
+            return enableSessionExpiredState;
+        }
+
         @Deprecated
         public String getAuthScheme()
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
index b463af2..3721d4b 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/UnhandledErrorListener.java
@@ -24,9 +24,7 @@ import 
org.apache.curator.framework.state.ConnectionStateListener;
 public interface UnhandledErrorListener
 {
     /**
-     * Called when an exception is caught in a background thread, handler, 
etc. Before this
-     * listener is called, the error will have been logged and a {@link 
ConnectionState#LOST}
-     * event will have been queued for any {@link ConnectionStateListener}s.
+     * Called when an exception is caught in a background thread, handler, etc.
      *
      * @param message Source message
      * @param e exception

http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 41bb7cd..c64fb8f 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -83,6 +83,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final NamespaceFacadeCache namespaceFacadeCache;
     private final NamespaceWatcherMap namespaceWatcherMap = new 
NamespaceWatcherMap(this);
     private final boolean useContainerParentsIfAvailable;
+    private final boolean enableSessionExpiredState;
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new 
AtomicBoolean(false);
@@ -119,11 +120,12 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         maxCloseWaitMs = builder.getMaxCloseWaitMs();
-        connectionStateManager = new ConnectionStateManager(this, 
builder.getThreadFactory());
+        connectionStateManager = new ConnectionStateManager(this, 
builder.getThreadFactory(), builder.getEnableSessionExpiredState(), 
builder.getSessionTimeoutMs());
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new 
AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
         useContainerParentsIfAvailable = 
builder.useContainerParentsIfAvailable();
+        enableSessionExpiredState = builder.getEnableSessionExpiredState();
 
         byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? 
Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
@@ -197,6 +199,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         state = parent.state;
         authInfos = parent.authInfos;
         useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
+        enableSessionExpiredState = parent.enableSessionExpiredState;
     }
 
     @Override
@@ -722,7 +725,10 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
             return;
         }
 
-        doSyncForSuspendedConnection(client.getInstanceIndex());
+        if ( !enableSessionExpiredState )
+        {
+            doSyncForSuspendedConnection(client.getInstanceIndex());
+        }
     }
 
     private void doSyncForSuspendedConnection(final long instanceIndex)

http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
index 3ca1d66..49d0044 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionState.java
@@ -19,6 +19,8 @@
 package org.apache.curator.framework.state;
 
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
 
 /**
  * Represents state changes in the connection to ZK
@@ -39,8 +41,7 @@ public enum ConnectionState
 
     /**
      * There has been a loss of connection. Leaders, locks, etc. should suspend
-     * until the connection is re-established. If the connection times-out you 
will
-     * receive a {@link #LOST} notice
+     * until the connection is re-established.
      */
     SUSPENDED
     {
@@ -62,9 +63,29 @@ public enum ConnectionState
     },
 
     /**
-     * The connection is confirmed to be lost. Close any locks, leaders, etc. 
and
-     * attempt to re-create them. NOTE: it is possible to get a {@link 
#RECONNECTED}
-     * state after this but you should still consider any locks, etc. as 
dirty/unstable
+     * <p>
+     *     NOTE: the meaning of this state depends on how your 
CuratorFramework instance
+     *     is created.
+     * </p>
+     *
+     * <p>
+     *     The default meaning of LOST (and the only meaning prior to Curator 
3.0.0) is:
+     *     The connection is confirmed to be lost (i.e. the retry policy has 
given up). Close any locks, leaders, etc. and
+     *     attempt to re-create them. NOTE: it is possible to get a {@link 
#RECONNECTED}
+     *     state after this but you should still consider any locks, etc. as 
dirty/unstable
+     * </p>
+     *
+     * <p>
+     *     <strong>Since 3.0.0</strong>, you can alter the meaning of LOST by 
calling
+     *     {@link 
CuratorFrameworkFactory.Builder#enableSessionExpiredState()}. In this mode,
+     *     Curator will set the LOST state only when it believes that the 
ZooKeeper session
+     *     has expired. ZooKeeper connections have a session. When the session 
expires, clients must take appropriate
+     *     action. In Curator, this is complicated by the fact that Curator 
internally manages the ZooKeeper
+     *     connection. In this mode, Curator will set the LOST state when any 
of the following occurs:
+     *     a) ZooKeeper returns a {@link Watcher.Event.KeeperState#Expired} or 
{@link KeeperException.Code#SESSIONEXPIRED};
+     *     b) Curator closes the internally managed ZooKeeper instance; c) The 
configured session timeout
+     *     elapses during a network partition.
+     * </p>
      */
     LOST
     {
@@ -87,7 +108,9 @@ public enum ConnectionState
         {
             return true;
         }
-    };
+    }
+
+    ;
     
     /**
      * Check if this state indicates that Curator has a connection to ZooKeeper

http://git-wip-us.apache.org/repos/asf/curator/blob/344634ac/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 67ff13d..c0feb84 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -65,6 +66,8 @@ public class ConnectionStateManager implements Closeable
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final BlockingQueue<ConnectionState> eventQueue = new 
ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
     private final CuratorFramework client;
+    private final boolean enableSessionExpiredState;
+    private final int sessionTimeoutMs;
     private final ListenerContainer<ConnectionStateListener> listeners = new 
ListenerContainer<ConnectionStateListener>();
     private final AtomicBoolean initialConnectMessageSent = new 
AtomicBoolean(false);
     private final ExecutorService service;
@@ -72,6 +75,8 @@ public class ConnectionStateManager implements Closeable
 
     // guarded by sync
     private ConnectionState currentConnectionState;
+    // guarded by sync
+    private long startOfSuspendedEpoch = 0;
 
     private enum State
     {
@@ -83,10 +88,14 @@ public class ConnectionStateManager implements Closeable
     /**
      * @param client        the client
      * @param threadFactory thread factory to use or null for a default
+     * @param enableSessionExpiredState if true, applies new meaning for LOST 
as described here: {@link ConnectionState#LOST}
+     * @param sessionTimeoutMs the ZK session timeout in milliseconds
      */
-    public ConnectionStateManager(CuratorFramework client, ThreadFactory 
threadFactory)
+    public ConnectionStateManager(CuratorFramework client, ThreadFactory 
threadFactory, boolean enableSessionExpiredState, int sessionTimeoutMs)
     {
         this.client = client;
+        this.enableSessionExpiredState = enableSessionExpiredState;
+        this.sessionTimeoutMs = sessionTimeoutMs;
         if ( threadFactory == null )
         {
             threadFactory = 
ThreadUtils.newThreadFactory("ConnectionStateManager");
@@ -137,7 +146,7 @@ public class ConnectionStateManager implements Closeable
 
     /**
      * Change to {@link ConnectionState#SUSPENDED} only if not already 
suspended and not lost
-     * 
+     *
      * @return true if connection is set to SUSPENDED
      */
     public synchronized boolean setToSuspended()
@@ -152,7 +161,7 @@ public class ConnectionStateManager implements Closeable
             return false;
         }
 
-        currentConnectionState = ConnectionState.SUSPENDED;
+        setCurrentConnectionState(ConnectionState.SUSPENDED);
         postState(ConnectionState.SUSPENDED);
 
         return true;
@@ -177,7 +186,7 @@ public class ConnectionStateManager implements Closeable
         {
             return false;
         }
-        currentConnectionState = newConnectionState;
+        setCurrentConnectionState(newConnectionState);
 
         ConnectionState localState = newConnectionState;
         boolean isNegativeMessage = ((newConnectionState == 
ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED) || 
(newConnectionState == ConnectionState.READ_ONLY));
@@ -242,25 +251,34 @@ public class ConnectionStateManager implements Closeable
         {
             while ( !Thread.currentThread().isInterrupted() )
             {
-                final ConnectionState newState = eventQueue.take();
-
-                if ( listeners.size() == 0 )
+                final ConnectionState newState = 
eventQueue.poll(sessionTimeoutMs, TimeUnit.MILLISECONDS);
+                if ( newState != null )
                 {
-                    log.warn("There are no ConnectionStateListeners 
registered.");
-                }
+                    if ( listeners.size() == 0 )
+                    {
+                        log.warn("There are no ConnectionStateListeners 
registered.");
+                    }
 
-                listeners.forEach
-                    (
-                        new Function<ConnectionStateListener, Void>()
-                        {
-                            @Override
-                            public Void apply(ConnectionStateListener listener)
+                    listeners.forEach
+                        (
+                            new Function<ConnectionStateListener, Void>()
                             {
-                                listener.stateChanged(client, newState);
-                                return null;
+                                @Override
+                                public Void apply(ConnectionStateListener 
listener)
+                                {
+                                    listener.stateChanged(client, newState);
+                                    return null;
+                                }
                             }
-                        }
-                    );
+                        );
+                }
+                else if ( enableSessionExpiredState )
+                {
+                    synchronized(this)
+                    {
+                        checkSessionExpiration();
+                    }
+                }
             }
         }
         catch ( InterruptedException e )
@@ -268,4 +286,23 @@ public class ConnectionStateManager implements Closeable
             Thread.currentThread().interrupt();
         }
     }
+
+    private void checkSessionExpiration()
+    {
+        if ( (currentConnectionState == ConnectionState.SUSPENDED) && 
(startOfSuspendedEpoch != 0) )
+        {
+            long elapsedMs = System.currentTimeMillis() - 
startOfSuspendedEpoch;
+            if ( elapsedMs >= sessionTimeoutMs )
+            {
+                log.info(String.format("Session timeout has elapsed while 
SUSPENDED. Posting LOST event. Elapsed ms: %d", elapsedMs));
+                addStateChange(ConnectionState.LOST);
+            }
+        }
+    }
+
+    private void setCurrentConnectionState(ConnectionState newConnectionState)
+    {
+        currentConnectionState = newConnectionState;
+        startOfSuspendedEpoch = (currentConnectionState == 
ConnectionState.SUSPENDED) ? System.currentTimeMillis() : 0;
+    }
 }

Reply via email to