Repository: curator Updated Branches: refs/heads/CURATOR-3.0 eefdf8ee9 -> e2200daad
Reworked WatcherRemovalManager. It now stores watchers only on successful operations. This is more like how ZK does it. Also, exists watcher must be stored when there is a NoNode result. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f59f23c7 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f59f23c7 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f59f23c7 Branch: refs/heads/CURATOR-3.0 Commit: f59f23c703815317d4ef1d39e2b815e402d1559b Parents: eefdf8e Author: randgalt <randg...@apache.org> Authored: Thu May 26 16:59:08 2016 -0500 Committer: randgalt <randg...@apache.org> Committed: Thu May 26 16:59:08 2016 -0500 ---------------------------------------------------------------------- curator-framework/pom.xml | 14 ++ .../curator/framework/imps/Backgrounding.java | 5 - .../framework/imps/CuratorFrameworkImpl.java | 2 - .../curator/framework/imps/EnsembleTracker.java | 22 +- .../framework/imps/ExistsBuilderImpl.java | 7 +- .../framework/imps/GetChildrenBuilderImpl.java | 8 +- .../framework/imps/GetConfigBuilderImpl.java | 11 +- .../framework/imps/GetDataBuilderImpl.java | 7 +- .../framework/imps/OperationAndData.java | 14 +- .../imps/RemoveWatchesBuilderImpl.java | 2 +- .../apache/curator/framework/imps/Watching.java | 41 +--- .../curator/framework/imps/TestCleanState.java | 103 +++++++++ .../imps/TestWatcherRemovalManager.java | 208 +++++++++++++++---- curator-recipes/pom.xml | 7 + .../curator/framework/imps/TestCleanState.java | 77 ------- pom.xml | 7 + 16 files changed, 357 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/pom.xml ---------------------------------------------------------------------- diff --git a/curator-framework/pom.xml b/curator-framework/pom.xml index d6575cc..1a65898 100644 --- a/curator-framework/pom.xml +++ b/curator-framework/pom.xml @@ -88,4 +88,18 @@ </dependency> </dependencies> + <build> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java index 0b823c4..4ac2edc 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Backgrounding.java @@ -116,11 +116,6 @@ class Backgrounding { if ( e != null ) { - if ( watching != null ) - { - watching.resetCurrentWatcher(); - } - if ( errorListener != null ) { errorListener.unhandledError("n/a", e); http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 51485f2..aba14c6 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -910,7 +910,6 @@ public class CuratorFrameworkImpl implements CuratorFramework void performBackgroundOperation(OperationAndData<?> operationAndData) { - operationAndData.resetCurrentWatcher(); try { if ( !operationAndData.isConnectionRequired() || client.isConnected() ) @@ -930,7 +929,6 @@ public class CuratorFrameworkImpl implements CuratorFramework } catch ( Throwable e ) { - operationAndData.resetCurrentWatcher(); ThreadUtils.checkInterrupted(e); /** http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java index bc59512..0b93cab 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java @@ -43,6 +43,7 @@ import java.io.ByteArrayInputStream; import java.io.Closeable; import java.util.Arrays; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @VisibleForTesting @@ -52,6 +53,7 @@ public class EnsembleTracker implements Closeable, CuratorWatcher private final WatcherRemoveCuratorFramework client; private final EnsembleProvider ensembleProvider; private final AtomicReference<State> state = new AtomicReference<>(State.LATENT); + private final AtomicInteger outstanding = new AtomicInteger(0); private final AtomicReference<QuorumMaj> currentConfig = new AtomicReference<>(new QuorumMaj(Maps.<Long, QuorumPeer.QuorumServer>newHashMap())); private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @@ -121,22 +123,38 @@ public class EnsembleTracker implements Closeable, CuratorWatcher return currentConfig.get(); } + @VisibleForTesting + public boolean hasOutstanding() + { + return outstanding.get() > 0; + } + private void reset() throws Exception { - if ( client.getState() == CuratorFrameworkState.STARTED ) + if ( (client.getState() == CuratorFrameworkState.STARTED) && (state.get() == State.STARTED) ) { BackgroundCallback backgroundCallback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + outstanding.decrementAndGet(); if ( (event.getType() == CuratorEventType.GET_CONFIG) && (event.getResultCode() == KeeperException.Code.OK.intValue()) ) { processConfigData(event.getData()); } } }; - client.getConfig().usingWatcher(this).inBackground(backgroundCallback).forEnsemble(); + outstanding.incrementAndGet(); + try + { + client.getConfig().usingWatcher(this).inBackground(backgroundCallback).forEnsemble(); + } + catch ( Exception e ) + { + outstanding.decrementAndGet(); + throw e; + } } } http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java index 964706f..960b577 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java @@ -132,7 +132,7 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E @Override public void processResult(int rc, String path, Object ctx, Stat stat) { - watching.checkBackroundRc(rc); + watching.commitWatcher(rc, true); trace.commit(); CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.EXISTS, rc, path, null, ctx, stat, null, null, null, null, null); client.processBackgroundOperation(operationAndData, event); @@ -222,8 +222,9 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E private Stat pathInForegroundStandard(final String path) throws Exception { TimeTrace trace = client.getZookeeperClient().startTracer("ExistsBuilderImpl-Foreground"); - Stat returnStat = watching.callWithRetry + Stat returnStat = RetryLoop.callWithRetry ( + client.getZookeeperClient(), new Callable<Stat>() { @Override @@ -237,6 +238,8 @@ class ExistsBuilderImpl implements ExistsBuilder, BackgroundOperation<String>, E else { returnStat = client.getZooKeeper().exists(path, watching.getWatcher(path)); + int rc = (returnStat != null) ? KeeperException.NoNodeException.Code.OK.intValue() : KeeperException.NoNodeException.Code.NONODE.intValue(); + watching.commitWatcher(rc, true); } return returnStat; } http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java index 0b1bb07..000c911 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.imps; import com.google.common.collect.Lists; +import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.BackgroundPathable; @@ -30,6 +31,7 @@ import org.apache.curator.framework.api.Pathable; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.api.WatchPathable; import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import java.util.List; @@ -167,7 +169,7 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< @Override public void processResult(int rc, String path, Object o, List<String> strings, Stat stat) { - watching.checkBackroundRc(rc); + watching.commitWatcher(rc, false); trace.commit(); if ( strings == null ) { @@ -214,8 +216,9 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< private List<String> pathInForeground(final String path) throws Exception { TimeTrace trace = client.getZookeeperClient().startTracer("GetChildrenBuilderImpl-Foreground"); - List<String> children = watching.callWithRetry + List<String> children = RetryLoop.callWithRetry ( + client.getZookeeperClient(), new Callable<List<String>>() { @Override @@ -229,6 +232,7 @@ class GetChildrenBuilderImpl implements GetChildrenBuilder, BackgroundOperation< else { children = client.getZooKeeper().getChildren(path, watching.getWatcher(path), responseStat); + watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false); } return children; } http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java index 3a210b8..1ab9043 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java @@ -19,9 +19,11 @@ package org.apache.curator.framework.imps; +import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.*; import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; @@ -206,7 +208,7 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - watching.checkBackroundRc(rc); + watching.commitWatcher(rc, false); trace.commit(); CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.GET_CONFIG, rc, path, null, ctx, stat, data, null, null, null, null); client.processBackgroundOperation(operationAndData, event); @@ -232,8 +234,9 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati TimeTrace trace = client.getZookeeperClient().startTracer("GetConfigBuilderImpl-Foreground"); try { - return watching.callWithRetry + return RetryLoop.callWithRetry ( + client.getZookeeperClient(), new Callable<byte[]>() { @Override @@ -243,7 +246,9 @@ public class GetConfigBuilderImpl implements GetConfigBuilder, BackgroundOperati { return client.getZooKeeper().getConfig(true, stat); } - return client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE), stat); + byte[] config = client.getZooKeeper().getConfig(watching.getWatcher(ZooDefs.CONFIG_NODE), stat); + watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false); + return config; } } ); http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java index 5528138..bae126c 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java @@ -18,6 +18,7 @@ */ package org.apache.curator.framework.imps; +import org.apache.curator.RetryLoop; import org.apache.curator.TimeTrace; import org.apache.curator.framework.api.*; import org.apache.curator.utils.ThreadUtils; @@ -238,7 +239,7 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - watching.checkBackroundRc(rc); + watching.commitWatcher(rc, false); trace.commit(); if ( decompress && (data != null) ) { @@ -294,8 +295,9 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, private byte[] pathInForeground(final String path) throws Exception { TimeTrace trace = client.getZookeeperClient().startTracer("GetDataBuilderImpl-Foreground"); - byte[] responseData = watching.callWithRetry + byte[] responseData = RetryLoop.callWithRetry ( + client.getZookeeperClient(), new Callable<byte[]>() { @Override @@ -309,6 +311,7 @@ class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, else { responseData = client.getZooKeeper().getData(path, watching.getWatcher(path), responseStat); + watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false); } return responseData; } http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java index 73ea38e..3d69e5d 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java @@ -41,14 +41,13 @@ class OperationAndData<T> implements Delayed, RetrySleeper private final AtomicLong ordinal = new AtomicLong(); private final Object context; private final boolean connectionRequired; - private final Watching watching; interface ErrorCallback<T> { void retriesExhausted(OperationAndData<T> operationAndData); } - OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired, Watching watching) + OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired) { this.operation = operation; this.data = data; @@ -56,7 +55,6 @@ class OperationAndData<T> implements Delayed, RetrySleeper this.errorCallback = errorCallback; this.context = context; this.connectionRequired = connectionRequired; - this.watching = watching; reset(); } @@ -68,7 +66,7 @@ class OperationAndData<T> implements Delayed, RetrySleeper OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, Watching watching) { - this(operation, data, callback, errorCallback, context, true, watching); + this(operation, data, callback, errorCallback, context, true); } Object getContext() @@ -117,14 +115,6 @@ class OperationAndData<T> implements Delayed, RetrySleeper return operation; } - void resetCurrentWatcher() - { - if ( watching != null ) - { - watching.resetCurrentWatcher(); - } - } - @Override public void sleepFor(long time, TimeUnit unit) throws InterruptedException { http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index c2d4d8e..27a3c0f 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -209,7 +209,7 @@ public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWat } client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), - errorCallback, backgrounding.getContext(), !local, null), null); + errorCallback, backgrounding.getContext(), !local), null); } private void pathInForeground(final String path) throws Exception http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java index 568f308..daa5dd3 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java @@ -19,11 +19,9 @@ package org.apache.curator.framework.imps; -import org.apache.curator.RetryLoop; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; -import java.util.concurrent.Callable; class Watching { @@ -77,14 +75,6 @@ class Watching namespaceWatcher = new NamespaceWatcher(client, curatorWatcher, unfixedPath); } - if ( namespaceWatcher != null ) - { - if ( client.getWatcherRemovalManager() != null ) - { - client.getWatcherRemovalManager().add(namespaceWatcher); - } - } - return namespaceWatcher; } @@ -98,33 +88,24 @@ class Watching return watched; } - <T> T callWithRetry(Callable<T> proc) throws Exception + void commitWatcher(int rc, boolean isExists) { - resetCurrentWatcher(); - try - { - return RetryLoop.callWithRetry(client.getZookeeperClient(), proc); - } - catch ( Exception e ) + boolean doCommit = false; + if ( isExists ) { - resetCurrentWatcher(); - throw e; + doCommit = ((rc == KeeperException.Code.OK.intValue()) || (rc == KeeperException.Code.NONODE.intValue())); } - } - - void resetCurrentWatcher() - { - if ( (namespaceWatcher != null) && (client.getWatcherRemovalManager() != null) ) + else { - client.getWatcherRemovalManager().noteTriggeredWatcher(namespaceWatcher); + doCommit = (rc == KeeperException.Code.OK.intValue()); } - } - void checkBackroundRc(int rc) - { - if ( rc != KeeperException.Code.OK.intValue() ) + if ( doCommit && (namespaceWatcher != null) ) { - resetCurrentWatcher(); + if ( client.getWatcherRemovalManager() != null ) + { + client.getWatcherRemovalManager().add(namespaceWatcher); + } } } } http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java new file mode 100644 index 0000000..aa759ee --- /dev/null +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCleanState.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.test.WatchersDebug; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.ZooKeeper; +import java.util.concurrent.Callable; + +public class TestCleanState +{ + public static void closeAndTestClean(CuratorFramework client) + { + if ( client == null ) + { + return; + } + + try + { + CuratorFrameworkImpl internalClient = (CuratorFrameworkImpl)client; + EnsembleTracker ensembleTracker = internalClient.getEnsembleTracker(); + if ( ensembleTracker != null ) + { + while ( ensembleTracker.hasOutstanding() ) + { + Thread.sleep(100); + } + ensembleTracker.close(); + } + ZooKeeper zooKeeper = internalClient.getZooKeeper(); + if ( zooKeeper != null ) + { + if ( WatchersDebug.getChildWatches(zooKeeper).size() != 0 ) + { + throw new AssertionError("One or more child watchers are still registered: " + WatchersDebug.getChildWatches(zooKeeper)); + } + if ( WatchersDebug.getExistWatches(zooKeeper).size() != 0 ) + { + throw new AssertionError("One or more exists watchers are still registered: " + WatchersDebug.getExistWatches(zooKeeper)); + } + if ( WatchersDebug.getDataWatches(zooKeeper).size() != 0 ) + { + throw new AssertionError("One or more data watchers are still registered: " + WatchersDebug.getDataWatches(zooKeeper)); + } + } + } + catch ( IllegalStateException ignore ) + { + // client already closed + } + catch ( Exception e ) + { + e.printStackTrace(); // not sure what to do here + } + finally + { + CloseableUtils.closeQuietly(client); + } + } + + public static void test(CuratorFramework client, Callable<Void> proc) throws Exception + { + boolean succeeded = false; + try + { + proc.call(); + succeeded = true; + } + finally + { + if ( succeeded ) + { + closeAndTestClean(client); + } + else + { + CloseableUtils.closeQuietly(client); + } + } + } + + private TestCleanState() + { + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java index cdb625d..9c405a2 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java @@ -27,18 +27,141 @@ import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; import org.apache.curator.test.WatchersDebug; -import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.testng.Assert; import org.testng.annotations.Test; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; public class TestWatcherRemovalManager extends BaseClassForTests { @Test + public void testSameWatcherDifferentPaths1Triggered() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); + final CountDownLatch latch = new CountDownLatch(1); + Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + latch.countDown(); + } + }; + removerClient.checkExists().usingWatcher(watcher).forPath("/a/b/c"); + removerClient.checkExists().usingWatcher(watcher).forPath("/d/e/f"); + removerClient.create().creatingParentsIfNeeded().forPath("/d/e/f"); + + Timing timing = new Timing(); + Assert.assertTrue(timing.awaitLatch(latch)); + timing.sleepABit(); + + removerClient.removeWatchers(); + } + finally + { + TestCleanState.closeAndTestClean(client); + } + } + + @Test + public void testSameWatcherDifferentPaths() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); + Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + // NOP + } + }; + removerClient.checkExists().usingWatcher(watcher).forPath("/a/b/c"); + removerClient.checkExists().usingWatcher(watcher).forPath("/d/e/f"); + Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(), 2); + removerClient.removeWatchers(); + } + finally + { + TestCleanState.closeAndTestClean(client); + } + } + + @Test + public void testSameWatcherDifferentKinds1Triggered() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); + final CountDownLatch latch = new CountDownLatch(1); + Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + latch.countDown(); + } + }; + + removerClient.create().creatingParentsIfNeeded().forPath("/a/b/c"); + removerClient.checkExists().usingWatcher(watcher).forPath("/a/b/c"); + removerClient.getData().usingWatcher(watcher).forPath("/a/b/c"); + removerClient.setData().forPath("/a/b/c", "new".getBytes()); + + Timing timing = new Timing(); + Assert.assertTrue(timing.awaitLatch(latch)); + timing.sleepABit(); + + removerClient.removeWatchers(); + } + finally + { + TestCleanState.closeAndTestClean(client); + } + } + + @Test + public void testSameWatcherDifferentKinds() throws Exception + { + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); + Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + // NOP + } + }; + + removerClient.create().creatingParentsIfNeeded().forPath("/a/b/c"); + removerClient.checkExists().usingWatcher(watcher).forPath("/a/b/c"); + removerClient.getData().usingWatcher(watcher).forPath("/a/b/c"); + removerClient.removeWatchers(); + } + finally + { + TestCleanState.closeAndTestClean(client); + } + } + + @Test public void testWithRetry() throws Exception { server.stop(); @@ -68,7 +191,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @@ -105,7 +228,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @@ -134,47 +257,50 @@ public class TestWatcherRemovalManager extends BaseClassForTests { // expected } - Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(), 0); + removerClient.removeWatchers(); } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @Test public void testMissingNodeInBackground() throws Exception { - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); - try + final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + Callable<Void> proc = new Callable<Void>() { - client.start(); - WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); - Watcher w = new Watcher() + @Override + public Void call() throws Exception { - @Override - public void process(WatchedEvent event) + client.start(); + WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); + Watcher w = new Watcher() { - // NOP - } - }; - final CountDownLatch latch = new CountDownLatch(1); - BackgroundCallback callback = new BackgroundCallback() - { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + @Override + public void process(WatchedEvent event) + { + // NOP + } + }; + final CountDownLatch latch = new CountDownLatch(1); + BackgroundCallback callback = new BackgroundCallback() { - latch.countDown(); - } - }; - removerClient.getData().usingWatcher(w).inBackground(callback).forPath("/one/two/three"); - Assert.assertTrue(new Timing().awaitLatch(latch)); - Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(), 0); - } - finally - { - CloseableUtils.closeQuietly(client); - } + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + latch.countDown(); + } + }; + removerClient.getData().usingWatcher(w).inBackground(callback).forPath("/one/two/three"); + Assert.assertTrue(new Timing().awaitLatch(latch)); + Assert.assertEquals(removerClient.getWatcherRemovalManager().getEntries().size(), 0); + removerClient.removeWatchers(); + return null; + } + }; + TestCleanState.test(client, proc); } @Test @@ -188,7 +314,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @@ -203,7 +329,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @@ -222,7 +348,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @@ -241,7 +367,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @@ -252,6 +378,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests try { client.start(); + client.create().forPath("/test"); WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework(); @@ -264,14 +391,15 @@ public class TestWatcherRemovalManager extends BaseClassForTests } }; - removerClient.getData().usingWatcher(watcher).forPath("/"); + removerClient.getData().usingWatcher(watcher).forPath("/test"); Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1); - removerClient.getData().usingWatcher(watcher).forPath("/"); + removerClient.getData().usingWatcher(watcher).forPath("/test"); Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1); + removerClient.removeWatchers(); } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @@ -308,7 +436,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } @@ -364,7 +492,7 @@ public class TestWatcherRemovalManager extends BaseClassForTests } finally { - CloseableUtils.closeQuietly(client); + TestCleanState.closeAndTestClean(client); } } http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-recipes/pom.xml ---------------------------------------------------------------------- diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml index 17414c2..0443adc 100644 --- a/curator-recipes/pom.xml +++ b/curator-recipes/pom.xml @@ -52,6 +52,13 @@ <dependency> <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java deleted file mode 100644 index f90f463..0000000 --- a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.framework.imps; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.test.WatchersDebug; -import org.apache.curator.utils.CloseableUtils; -import org.apache.zookeeper.ZooKeeper; - -public class TestCleanState -{ - public static void closeAndTestClean(CuratorFramework client) - { - if ( client == null ) - { - return; - } - - try - { - CuratorFrameworkImpl internalClient = (CuratorFrameworkImpl)client; - EnsembleTracker ensembleTracker = internalClient.getEnsembleTracker(); - if ( ensembleTracker != null ) - { - ensembleTracker.close(); - } - ZooKeeper zooKeeper = internalClient.getZooKeeper(); - if ( zooKeeper != null ) - { - if ( WatchersDebug.getChildWatches(zooKeeper).size() != 0 ) - { - throw new AssertionError("One or more child watchers are still registered: " + WatchersDebug.getChildWatches(zooKeeper)); - } - if ( WatchersDebug.getExistWatches(zooKeeper).size() != 0 ) - { - throw new AssertionError("One or more exists watchers are still registered: " + WatchersDebug.getExistWatches(zooKeeper)); - } - if ( WatchersDebug.getDataWatches(zooKeeper).size() != 0 ) - { - throw new AssertionError("One or more data watchers are still registered: " + WatchersDebug.getDataWatches(zooKeeper)); - } - } - } - catch ( IllegalStateException ignore ) - { - // client already closed - } - catch ( Exception e ) - { - e.printStackTrace(); // not sure what to do here - } - finally - { - CloseableUtils.closeQuietly(client); - } - } - - private TestCleanState() - { - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/f59f23c7/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c89092d..384b310 100644 --- a/pom.xml +++ b/pom.xml @@ -322,6 +322,13 @@ <dependency> <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <type>test-jar</type> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>${project.version}</version> </dependency>