Repository: curator Updated Branches: refs/heads/master cd365c414 -> 1fade17a2
To avoid massive spinning, background operations are paused for 1 second when there is no connection. However, this can hurt performance terribly if background operations are queued, for example, prior to initial connection. This changes the behavior so that the sleeps are cleared when the connection is re-established. A separate queue of "forced sleep" operations are kept while the connection is down. This queue then gets its sleep cleared when the connection is re-established. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bfdb790b Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bfdb790b Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bfdb790b Branch: refs/heads/master Commit: bfdb790bc69022b0eef74558e9511e6ed2b665e9 Parents: d502dde Author: randgalt <randg...@apache.org> Authored: Tue Nov 21 18:11:01 2017 -0800 Committer: randgalt <randg...@apache.org> Committed: Tue Nov 21 18:11:01 2017 -0800 ---------------------------------------------------------------------- .../framework/imps/CuratorFrameworkImpl.java | 37 ++++++++++++++++++-- .../framework/imps/OperationAndData.java | 5 +++ .../framework/imps/TestFrameworkEdges.java | 31 ++++++++++++++++ 3 files changed, 70 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/bfdb790b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 7488793..4a5fad3 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -54,13 +54,16 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -76,6 +79,7 @@ public class CuratorFrameworkImpl implements CuratorFramework private final ThreadFactory threadFactory; private final int maxCloseWaitMs; private final BlockingQueue<OperationAndData<?>> backgroundOperations; + private final BlockingQueue<OperationAndData<?>> forcedSleepOperations; private final NamespaceImpl namespace; private final ConnectionStateManager connectionStateManager; private final List<AuthInfo> authInfos; @@ -136,6 +140,7 @@ public class CuratorFrameworkImpl implements CuratorFramework listeners = new ListenerContainer<CuratorListener>(); unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>(); backgroundOperations = new DelayQueue<OperationAndData<?>>(); + forcedSleepOperations = new LinkedBlockingQueue<>(); namespace = new NamespaceImpl(this, builder.getNamespace()); threadFactory = getThreadFactory(builder); maxCloseWaitMs = builder.getMaxCloseWaitMs(); @@ -217,6 +222,7 @@ public class CuratorFrameworkImpl implements CuratorFramework threadFactory = parent.threadFactory; maxCloseWaitMs = parent.maxCloseWaitMs; backgroundOperations = parent.backgroundOperations; + forcedSleepOperations = parent.forcedSleepOperations; connectionStateManager = parent.connectionStateManager; defaultData = parent.defaultData; failedDeleteManager = parent.failedDeleteManager; @@ -640,12 +646,14 @@ public class CuratorFrameworkImpl implements CuratorFramework } } - <DATA_TYPE> void queueOperation(OperationAndData<DATA_TYPE> operationAndData) + <DATA_TYPE> boolean queueOperation(OperationAndData<DATA_TYPE> operationAndData) { if ( getState() == CuratorFrameworkState.STARTED ) { backgroundOperations.offer(operationAndData); + return true; } + return false; } void logError(String reason, final Throwable e) @@ -730,6 +738,7 @@ public class CuratorFrameworkImpl implements CuratorFramework { internalConnectionHandler.checkNewConnection(this); connectionStateManager.addStateChange(ConnectionState.RECONNECTED); + unSleepBackgroundOperations(); } else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly ) { @@ -940,8 +949,7 @@ public class CuratorFrameworkImpl implements CuratorFramework { throw new CuratorConnectionLossException(); } - operationAndData.sleepFor(1, TimeUnit.SECONDS); - queueOperation(operationAndData); + sleepAndQueueOperation(operationAndData); } } catch ( Throwable e ) @@ -973,6 +981,29 @@ public class CuratorFrameworkImpl implements CuratorFramework } } + @VisibleForTesting + volatile long sleepAndQueueOperationSeconds = 1; + + private void sleepAndQueueOperation(OperationAndData<?> operationAndData) throws InterruptedException + { + operationAndData.sleepFor(sleepAndQueueOperationSeconds, TimeUnit.SECONDS); + if ( queueOperation(operationAndData) ) + { + forcedSleepOperations.add(operationAndData); + } + } + + private void unSleepBackgroundOperations() + { + Collection<OperationAndData<?>> drain = new ArrayList<>(forcedSleepOperations.size()); + forcedSleepOperations.drainTo(drain); + log.debug("Clearing sleep for {} operations", drain.size()); + for ( OperationAndData<?> operation : drain ) + { + operation.clearSleep(); + } + } + private void processEvent(final CuratorEvent curatorEvent) { if ( curatorEvent.getType() == CuratorEventType.WATCHED ) http://git-wip-us.apache.org/repos/asf/curator/blob/bfdb790b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java index 3d69e5d..8370415 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java @@ -115,6 +115,11 @@ class OperationAndData<T> implements Delayed, RetrySleeper return operation; } + void clearSleep() + { + sleepUntilTimeMs.set(0); + } + @Override public void sleepFor(long time, TimeUnit unit) throws InterruptedException { http://git-wip-us.apache.org/repos/asf/curator/blob/bfdb790b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java index 9c4afe3..35e9fb1 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java @@ -61,6 +61,37 @@ public class TestFrameworkEdges extends BaseClassForTests private final Timing2 timing = new Timing2(); @Test + public void testBackgroundLatencyUnSleep() throws Exception + { + server.stop(); + + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + ((CuratorFrameworkImpl)client).sleepAndQueueOperationSeconds = Integer.MAX_VALUE; + + final CountDownLatch latch = new CountDownLatch(1); + BackgroundCallback callback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + latch.countDown(); + } + }; + client.create().inBackground(callback).forPath("/test"); + server.restart(); + + Assert.assertTrue(timing.awaitLatch(latch)); + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + @Test public void testCreateContainersForBadConnect() throws Exception { final int serverPort = server.getPort();