further refactoring. Abstracted old framework-level connection handling into 
ClassicInternalConnectionHandler. Probably more to do here


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/30bd7b65
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/30bd7b65
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/30bd7b65

Branch: refs/heads/CURATOR-248
Commit: 30bd7b655d201762d8ff74062964621879ac7134
Parents: e239137
Author: randgalt <randg...@apache.org>
Authored: Sat Aug 22 19:29:36 2015 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sat Aug 22 19:29:36 2015 -0500

----------------------------------------------------------------------
 .../imps/ClassicInternalConnectionHandler.java  | 58 ++++++++++++++++++
 .../framework/imps/CuratorFrameworkImpl.java    | 64 ++++++--------------
 .../imps/InternalConnectionHandler.java         | 10 +++
 .../imps/StandardInternalConnectionHandler.java | 22 +++++++
 .../framework/state/ConnectionStateManager.java |  8 ++-
 5 files changed, 112 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
new file mode 100644
index 0000000..1de6e80
--- /dev/null
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/ClassicInternalConnectionHandler.java
@@ -0,0 +1,58 @@
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.state.ConnectionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ClassicInternalConnectionHandler implements InternalConnectionHandler
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public void checkNewConnection(CuratorFrameworkImpl client)
+    {
+        // NOP
+    }
+
+    @Override
+    public boolean checkSessionExpirationEnabled()
+    {
+        return false;
+    }
+
+    @Override
+    public void suspendConnection(CuratorFrameworkImpl client)
+    {
+        if ( client.setToSuspended() )
+        {
+            doSyncForSuspendedConnection(client, 
client.getZookeeperClient().getInstanceIndex());
+        }
+    }
+
+    private void doSyncForSuspendedConnection(final CuratorFrameworkImpl 
client, final long instanceIndex)
+    {
+        // we appear to have disconnected, force a new ZK event and see if we 
can connect to another server
+        final BackgroundOperation<String> operation = new 
BackgroundSyncImpl(client, null);
+        OperationAndData.ErrorCallback<String> errorCallback = new 
OperationAndData.ErrorCallback<String>()
+        {
+            @Override
+            public void retriesExhausted(OperationAndData<String> 
operationAndData)
+            {
+                // if instanceIndex != newInstanceIndex, the ZooKeeper 
instance was reset/reallocated
+                // so the pending background sync is no longer valid.
+                // if instanceIndex is -1, this is the second try to sync - 
punt and mark the connection lost
+                if ( (instanceIndex < 0) || (instanceIndex == 
client.getZookeeperClient().getInstanceIndex()) )
+                {
+                    client.addStateChange(ConnectionState.LOST);
+                }
+                else
+                {
+                    log.debug("suspendConnection() failure ignored as the 
ZooKeeper instance was reset. Retrying.");
+                    // send -1 to signal that if it happens again, punt and 
mark the connection lost
+                    doSyncForSuspendedConnection(client, -1);
+                }
+            }
+        };
+        client.performBackgroundOperation(new 
OperationAndData<String>(operation, "/", null, errorCallback, null));
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 44a8ec6..b04987d 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -85,6 +85,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final NamespaceWatcherMap namespaceWatcherMap = new 
NamespaceWatcherMap(this);
     private final boolean useContainerParentsIfAvailable;
     private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
+    private final InternalConnectionHandler internalConnectionHandler;
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new 
AtomicBoolean(false);
@@ -125,13 +126,14 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
                 builder.getConnectionHandlingPolicy()
             );
 
+        internalConnectionHandler = 
builder.getConnectionHandlingPolicy().isEmulatingClassicHandling() ? new 
ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler();
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new 
ListenerContainer<UnhandledErrorListener>();
         backgroundOperations = new DelayQueue<OperationAndData<?>>();
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         maxCloseWaitMs = builder.getMaxCloseWaitMs();
-        connectionStateManager = new ConnectionStateManager(this, 
builder.getThreadFactory(), builder.getSessionTimeoutMs());
+        connectionStateManager = new ConnectionStateManager(this, 
builder.getThreadFactory(), builder.getSessionTimeoutMs(), 
internalConnectionHandler.checkSessionExpirationEnabled());
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new 
AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -209,6 +211,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         state = parent.state;
         authInfos = parent.authInfos;
         useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
+        internalConnectionHandler = parent.internalConnectionHandler;
     }
 
     @Override
@@ -676,7 +679,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
     {
         if ( state == Watcher.Event.KeeperState.Disconnected )
         {
-            suspendConnection();
+            internalConnectionHandler.suspendConnection(this);
         }
         else if ( state == Watcher.Event.KeeperState.Expired )
         {
@@ -684,26 +687,23 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         }
         else if ( state == Watcher.Event.KeeperState.SyncConnected )
         {
-            checkNewConnection();
+            internalConnectionHandler.checkNewConnection(this);
             connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
         }
         else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
         {
-            checkNewConnection();
+            internalConnectionHandler.checkNewConnection(this);
             connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
         }
     }
 
-    private void checkNewConnection()
+    void checkInstanceIndex()
     {
-        if ( 
!client.getConnectionHandlingPolicy().isEmulatingClassicHandling() )
+        long instanceIndex = client.getInstanceIndex();
+        long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex);
+        if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) )  
 // currentInstanceIndex is initially -1 - ignore this
         {
-            long instanceIndex = client.getInstanceIndex();
-            long newInstanceIndex = 
currentInstanceIndex.getAndSet(instanceIndex);
-            if ( (newInstanceIndex >= 0) && (instanceIndex != 
newInstanceIndex) )   // currentInstanceIndex is initially -1 - ignore this
-            {
-                connectionStateManager.addStateChange(ConnectionState.LOST);
-            }
+            connectionStateManager.addStateChange(ConnectionState.LOST);
         }
     }
 
@@ -742,44 +742,14 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         return null;
     }
 
-    private void suspendConnection()
+    boolean setToSuspended()
     {
-        if ( !connectionStateManager.setToSuspended() )
-        {
-            return;
-        }
-
-        if ( client.getConnectionHandlingPolicy().isEmulatingClassicHandling() 
)
-        {
-            doSyncForSuspendedConnection(client.getInstanceIndex());
-        }
+        return connectionStateManager.setToSuspended();
     }
 
-    private void doSyncForSuspendedConnection(final long instanceIndex)
+    void addStateChange(ConnectionState newConnectionState)
     {
-        // we appear to have disconnected, force a new ZK event and see if we 
can connect to another server
-        final BackgroundOperation<String> operation = new 
BackgroundSyncImpl(this, null);
-        OperationAndData.ErrorCallback<String> errorCallback = new 
OperationAndData.ErrorCallback<String>()
-        {
-            @Override
-            public void retriesExhausted(OperationAndData<String> 
operationAndData)
-            {
-                // if instanceIndex != newInstanceIndex, the ZooKeeper 
instance was reset/reallocated
-                // so the pending background sync is no longer valid.
-                // if instanceIndex is -1, this is the second try to sync - 
punt and mark the connection lost
-                if ( (instanceIndex < 0) || (instanceIndex == 
client.getInstanceIndex()) )
-                {
-                    
connectionStateManager.addStateChange(ConnectionState.LOST);
-                }
-                else
-                {
-                    log.debug("suspendConnection() failure ignored as the 
ZooKeeper instance was reset. Retrying.");
-                    // send -1 to signal that if it happens again, punt and 
mark the connection lost
-                    doSyncForSuspendedConnection(-1);
-                }
-            }
-        };
-        performBackgroundOperation(new OperationAndData<String>(operation, 
"/", null, errorCallback, null));
+        connectionStateManager.addStateChange(newConnectionState);
     }
 
     @SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
@@ -894,7 +864,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         }
     }
 
-    private void performBackgroundOperation(OperationAndData<?> 
operationAndData)
+    void performBackgroundOperation(OperationAndData<?> operationAndData)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
new file mode 100644
index 0000000..e9798d7
--- /dev/null
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/InternalConnectionHandler.java
@@ -0,0 +1,10 @@
+package org.apache.curator.framework.imps;
+
+interface InternalConnectionHandler
+{
+    void checkNewConnection(CuratorFrameworkImpl client);
+
+    void suspendConnection(CuratorFrameworkImpl client);
+
+    boolean checkSessionExpirationEnabled();
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
new file mode 100644
index 0000000..b0452c6
--- /dev/null
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/StandardInternalConnectionHandler.java
@@ -0,0 +1,22 @@
+package org.apache.curator.framework.imps;
+
+class StandardInternalConnectionHandler implements InternalConnectionHandler
+{
+    @Override
+    public void suspendConnection(CuratorFrameworkImpl client)
+    {
+        client.setToSuspended();
+    }
+
+    @Override
+    public boolean checkSessionExpirationEnabled()
+    {
+        return true;
+    }
+
+    @Override
+    public void checkNewConnection(CuratorFrameworkImpl client)
+    {
+        client.checkInstanceIndex();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/30bd7b65/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 2e7492f..406099d 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -21,7 +21,6 @@ package org.apache.curator.framework.state;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import org.apache.curator.connection.ConnectionHandlingPolicyStyle;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.utils.ThreadUtils;
@@ -67,6 +66,7 @@ public class ConnectionStateManager implements Closeable
     private final BlockingQueue<ConnectionState> eventQueue = new 
ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
     private final CuratorFramework client;
     private final int sessionTimeoutMs;
+    private final boolean checkSessionExpiration;
     private final ListenerContainer<ConnectionStateListener> listeners = new 
ListenerContainer<ConnectionStateListener>();
     private final AtomicBoolean initialConnectMessageSent = new 
AtomicBoolean(false);
     private final ExecutorService service;
@@ -88,11 +88,13 @@ public class ConnectionStateManager implements Closeable
      * @param client        the client
      * @param threadFactory thread factory to use or null for a default
      * @param sessionTimeoutMs the ZK session timeout in milliseconds
+     * @param checkSessionExpiration if true, check for session timeouts, etc. 
ala new connection handling method
      */
-    public ConnectionStateManager(CuratorFramework client, ThreadFactory 
threadFactory, int sessionTimeoutMs)
+    public ConnectionStateManager(CuratorFramework client, ThreadFactory 
threadFactory, int sessionTimeoutMs, boolean checkSessionExpiration)
     {
         this.client = client;
         this.sessionTimeoutMs = sessionTimeoutMs;
+        this.checkSessionExpiration = checkSessionExpiration;
         if ( threadFactory == null )
         {
             threadFactory = 
ThreadUtils.newThreadFactory("ConnectionStateManager");
@@ -270,7 +272,7 @@ public class ConnectionStateManager implements Closeable
                             }
                         );
                 }
-                else if ( 
!client.getZookeeperClient().getConnectionHandlingPolicy().isEmulatingClassicHandling()
 )
+                else if ( checkSessionExpiration )
                 {
                     synchronized(this)
                     {

Reply via email to