Repository: curator
Updated Branches:
  refs/heads/master cd365c414 -> 1fade17a2


To avoid massive spinning, background operations are paused for 1 second when 
there is no connection. However, this can hurt performance terribly if 
background operations are queued, for example, prior to initial connection. 
This changes the behavior so that the sleeps are cleared when the connection is 
re-established. A separate queue of "forced sleep" operations are kept while 
the connection is down. This queue then gets its sleep cleared when the 
connection is re-established.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bfdb790b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bfdb790b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bfdb790b

Branch: refs/heads/master
Commit: bfdb790bc69022b0eef74558e9511e6ed2b665e9
Parents: d502dde
Author: randgalt <randg...@apache.org>
Authored: Tue Nov 21 18:11:01 2017 -0800
Committer: randgalt <randg...@apache.org>
Committed: Tue Nov 21 18:11:01 2017 -0800

----------------------------------------------------------------------
 .../framework/imps/CuratorFrameworkImpl.java    | 37 ++++++++++++++++++--
 .../framework/imps/OperationAndData.java        |  5 +++
 .../framework/imps/TestFrameworkEdges.java      | 31 ++++++++++++++++
 3 files changed, 70 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/bfdb790b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 7488793..4a5fad3 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -54,13 +54,16 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -76,6 +79,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final ThreadFactory threadFactory;
     private final int maxCloseWaitMs;
     private final BlockingQueue<OperationAndData<?>> backgroundOperations;
+    private final BlockingQueue<OperationAndData<?>> forcedSleepOperations;
     private final NamespaceImpl namespace;
     private final ConnectionStateManager connectionStateManager;
     private final List<AuthInfo> authInfos;
@@ -136,6 +140,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         listeners = new ListenerContainer<CuratorListener>();
         unhandledErrorListeners = new 
ListenerContainer<UnhandledErrorListener>();
         backgroundOperations = new DelayQueue<OperationAndData<?>>();
+        forcedSleepOperations = new LinkedBlockingQueue<>();
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         maxCloseWaitMs = builder.getMaxCloseWaitMs();
@@ -217,6 +222,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         threadFactory = parent.threadFactory;
         maxCloseWaitMs = parent.maxCloseWaitMs;
         backgroundOperations = parent.backgroundOperations;
+        forcedSleepOperations = parent.forcedSleepOperations;
         connectionStateManager = parent.connectionStateManager;
         defaultData = parent.defaultData;
         failedDeleteManager = parent.failedDeleteManager;
@@ -640,12 +646,14 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         }
     }
 
-    <DATA_TYPE> void queueOperation(OperationAndData<DATA_TYPE> 
operationAndData)
+    <DATA_TYPE> boolean queueOperation(OperationAndData<DATA_TYPE> 
operationAndData)
     {
         if ( getState() == CuratorFrameworkState.STARTED )
         {
             backgroundOperations.offer(operationAndData);
+            return true;
         }
+        return false;
     }
 
     void logError(String reason, final Throwable e)
@@ -730,6 +738,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         {
             internalConnectionHandler.checkNewConnection(this);
             connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
+            unSleepBackgroundOperations();
         }
         else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
         {
@@ -940,8 +949,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
                 {
                     throw new CuratorConnectionLossException();
                 }
-                operationAndData.sleepFor(1, TimeUnit.SECONDS);
-                queueOperation(operationAndData);
+                sleepAndQueueOperation(operationAndData);
             }
         }
         catch ( Throwable e )
@@ -973,6 +981,29 @@ public class CuratorFrameworkImpl implements 
CuratorFramework
         }
     }
 
+    @VisibleForTesting
+    volatile long sleepAndQueueOperationSeconds = 1;
+
+    private void sleepAndQueueOperation(OperationAndData<?> operationAndData) 
throws InterruptedException
+    {
+        operationAndData.sleepFor(sleepAndQueueOperationSeconds, 
TimeUnit.SECONDS);
+        if ( queueOperation(operationAndData) )
+        {
+            forcedSleepOperations.add(operationAndData);
+        }
+    }
+
+    private void unSleepBackgroundOperations()
+    {
+        Collection<OperationAndData<?>> drain = new 
ArrayList<>(forcedSleepOperations.size());
+        forcedSleepOperations.drainTo(drain);
+        log.debug("Clearing sleep for {} operations", drain.size());
+        for ( OperationAndData<?> operation : drain )
+        {
+            operation.clearSleep();
+        }
+    }
+
     private void processEvent(final CuratorEvent curatorEvent)
     {
         if ( curatorEvent.getType() == CuratorEventType.WATCHED )

http://git-wip-us.apache.org/repos/asf/curator/blob/bfdb790b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 3d69e5d..8370415 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -115,6 +115,11 @@ class OperationAndData<T> implements Delayed, RetrySleeper
         return operation;
     }
 
+    void clearSleep()
+    {
+        sleepUntilTimeMs.set(0);
+    }
+
     @Override
     public void sleepFor(long time, TimeUnit unit) throws InterruptedException
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/bfdb790b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
index 9c4afe3..35e9fb1 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java
@@ -61,6 +61,37 @@ public class TestFrameworkEdges extends BaseClassForTests
     private final Timing2 timing = new Timing2();
 
     @Test
+    public void testBackgroundLatencyUnSleep() throws Exception
+    {
+        server.stop();
+
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        try
+        {
+            client.start();
+            ((CuratorFrameworkImpl)client).sleepAndQueueOperationSeconds = 
Integer.MAX_VALUE;
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            BackgroundCallback callback = new BackgroundCallback()
+            {
+                @Override
+                public void processResult(CuratorFramework client, 
CuratorEvent event) throws Exception
+                {
+                    latch.countDown();
+                }
+            };
+            client.create().inBackground(callback).forPath("/test");
+            server.restart();
+
+            Assert.assertTrue(timing.awaitLatch(latch));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void testCreateContainersForBadConnect() throws Exception
     {
         final int serverPort = server.getPort();

Reply via email to