DL: remove watches when unregister children watches RB_ID=833858
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/b571d3b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/b571d3b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/b571d3b4 Branch: refs/heads/merge/DL-98 Commit: b571d3b4adcc140acca881979782474c27459d34 Parents: 517c77c Author: Sijie Guo <sij...@twitter.com> Authored: Mon May 23 21:01:57 2016 -0700 Committer: Sijie Guo <sij...@twitter.com> Committed: Mon Dec 12 16:35:26 2016 -0800 ---------------------------------------------------------------------- .../twitter/distributedlog/ZooKeeperClient.java | 1 + .../distributedlog/zk/ZKWatcherManager.java | 34 +++++++++++++++++++- .../distributedlog/zk/TestZKWatcherManager.java | 1 + 3 files changed, 35 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b571d3b4/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java index 912d592..9ea9e37 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java @@ -169,6 +169,7 @@ public class ZooKeeperClient { this.credentials = credentials; this.watcherManager = ZKWatcherManager.newBuilder() .name(name) + .zkc(this) .statsLogger(statsLogger.scope("watcher_manager")) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b571d3b4/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java index 4068737..a24b560 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java @@ -17,8 +17,11 @@ */ package com.twitter.distributedlog.zk; +import com.twitter.distributedlog.ZooKeeperClient; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; @@ -50,31 +53,40 @@ public class ZKWatcherManager implements Watcher { private String _name; private StatsLogger _statsLogger; + private ZooKeeperClient _zkc; public Builder name(String name) { this._name = name; return this; } + public Builder zkc(ZooKeeperClient zkc) { + this._zkc = zkc; + return this; + } + public Builder statsLogger(StatsLogger statsLogger) { this._statsLogger = statsLogger; return this; } public ZKWatcherManager build() { - return new ZKWatcherManager(_name, _statsLogger); + return new ZKWatcherManager(_name, _zkc, _statsLogger); } } private final String name; + private final ZooKeeperClient zkc; private final StatsLogger statsLogger; protected final ConcurrentMap<String, Set<Watcher>> childWatches; protected final AtomicInteger allWatchesGauge; private ZKWatcherManager(String name, + ZooKeeperClient zkc, StatsLogger statsLogger) { this.name = name; + this.zkc = zkc; this.statsLogger = statsLogger; // watches @@ -141,6 +153,26 @@ public class ZKWatcherManager implements Watcher { logger.warn("Remove a non-registered child watcher {} from path {}", watcher, path); } if (watchers.isEmpty()) { + // best-efforts to remove watches + try { + if (null != zkc) { + zkc.get().removeWatches(path, this, WatcherType.Children, true, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + if (KeeperException.Code.OK.intValue() == rc) { + logger.debug("Successfully removed children watches from {}", path); + } else { + logger.debug("Encountered exception on removing children watches from {}", + path, KeeperException.create(KeeperException.Code.get(rc))); + } + } + }, null); + } + } catch (InterruptedException e) { + logger.debug("Encountered exception on removing watches from {}", path, e); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + logger.debug("Encountered exception on removing watches from {}", path, e); + } childWatches.remove(path, watchers); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b571d3b4/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java index ee00ab9..6f269c3 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java @@ -34,6 +34,7 @@ public class TestZKWatcherManager { public void testRegisterUnregisterWatcher() throws Exception { ZKWatcherManager watcherManager = ZKWatcherManager.newBuilder() .name("test-register-unregister-watcher") + .zkc(null) .statsLogger(NullStatsLogger.INSTANCE) .build(); String path = "/test-register-unregister-watcher";