Repository: incubator-distributedlog Updated Branches: refs/heads/master b4150fc84 -> a1c15f8ab
DL-88: remove watches when unregister children watches merge twitter's change from Sijie Guo Author: Sijie Guo <[email protected]> Author: Jordan Bull <[email protected]> Author: Leigh Stewart <[email protected]> Reviewers: Leigh Stewart <[email protected]> Closes #60 from sijie/merge/DL-88 Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/a1c15f8a Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/a1c15f8a Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/a1c15f8a Branch: refs/heads/master Commit: a1c15f8ab014417e67bd876ccb338883a9d112c8 Parents: b4150fc Author: Sijie Guo <[email protected]> Authored: Fri Dec 16 22:50:41 2016 -0800 Committer: Sijie Guo <[email protected]> Committed: Fri Dec 16 22:50:41 2016 -0800 ---------------------------------------------------------------------- .../twitter/distributedlog/BKLogHandler.java | 7 ---- .../distributedlog/BKLogReadHandler.java | 7 +++- .../distributedlog/BKLogWriteHandler.java | 6 +++- .../twitter/distributedlog/ZooKeeperClient.java | 1 + .../readahead/ReadAheadWorker.java | 2 +- .../distributedlog/zk/ZKWatcherManager.java | 36 ++++++++++++++++++-- .../distributedlog/zk/TestZKWatcherManager.java | 3 +- 7 files changed, 49 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java index 9aa3465..a6ec318 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java @@ -702,13 +702,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor } @Override - public Future<Void> asyncClose() { - // No-op - this.zooKeeperClient.getWatcherManager().unregisterChildWatcher(logMetadata.getLogSegmentsPath(), this); - return Future.Void(); - } - - @Override public Future<Void> asyncAbort() { return asyncClose(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java index 0bf6b84..6a8f90e 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java @@ -311,7 +311,12 @@ class BKLogReadHandler extends BKLogHandler { if (null != handleCache) { handleCache.clear(); } - return BKLogReadHandler.super.asyncClose(); + // No-op + zooKeeperClient.getWatcherManager().unregisterChildWatcher( + logMetadata.getLogSegmentsPath(), + BKLogReadHandler.this, + true); + return Future.Void(); } }); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java index 573679a..4665ed5 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java @@ -1263,7 +1263,11 @@ class BKLogWriteHandler extends BKLogHandler { ).flatMap(new AbstractFunction1<Void, Future<Void>>() { @Override public Future<Void> apply(Void result) { - return BKLogWriteHandler.super.asyncClose(); + zooKeeperClient.getWatcherManager().unregisterChildWatcher( + logMetadata.getLogSegmentsPath(), + BKLogWriteHandler.this, + false); + return Future.Void(); } }); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java index 912d592..9ea9e37 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java @@ -169,6 +169,7 @@ public class ZooKeeperClient { this.credentials = credentials; this.watcherManager = ZKWatcherManager.newBuilder() .name(name) + .zkc(this) .statsLogger(statsLogger.scope("watcher_manager")) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java index a3fd239..5c15009 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java @@ -359,7 +359,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As running = false; this.zkc.getWatcherManager() - .unregisterChildWatcher(this.logMetadata.getLogSegmentsPath(), this); + .unregisterChildWatcher(this.logMetadata.getLogSegmentsPath(), this, true); // Aside from unfortunate naming of variables, this allows // the currently active long poll to be interrupted and completed http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java index 4068737..03b2841 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java @@ -17,8 +17,11 @@ */ package com.twitter.distributedlog.zk; +import com.twitter.distributedlog.ZooKeeperClient; import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; @@ -50,31 +53,40 @@ public class ZKWatcherManager implements Watcher { private String _name; private StatsLogger _statsLogger; + private ZooKeeperClient _zkc; public Builder name(String name) { this._name = name; return this; } + public Builder zkc(ZooKeeperClient zkc) { + this._zkc = zkc; + return this; + } + public Builder statsLogger(StatsLogger statsLogger) { this._statsLogger = statsLogger; return this; } public ZKWatcherManager build() { - return new ZKWatcherManager(_name, _statsLogger); + return new ZKWatcherManager(_name, _zkc, _statsLogger); } } private final String name; + private final ZooKeeperClient zkc; private final StatsLogger statsLogger; protected final ConcurrentMap<String, Set<Watcher>> childWatches; protected final AtomicInteger allWatchesGauge; private ZKWatcherManager(String name, + ZooKeeperClient zkc, StatsLogger statsLogger) { this.name = name; + this.zkc = zkc; this.statsLogger = statsLogger; // watches @@ -127,7 +139,7 @@ public class ZKWatcherManager implements Watcher { return this; } - public void unregisterChildWatcher(String path, Watcher watcher) { + public void unregisterChildWatcher(String path, Watcher watcher, boolean removeFromServer) { Set<Watcher> watchers = childWatches.get(path); if (null == watchers) { logger.warn("No watchers found on path {} while unregistering child watcher {}.", @@ -141,6 +153,26 @@ public class ZKWatcherManager implements Watcher { logger.warn("Remove a non-registered child watcher {} from path {}", watcher, path); } if (watchers.isEmpty()) { + // best-efforts to remove watches + try { + if (null != zkc && removeFromServer) { + zkc.get().removeWatches(path, this, WatcherType.Children, true, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + if (KeeperException.Code.OK.intValue() == rc) { + logger.debug("Successfully removed children watches from {}", path); + } else { + logger.debug("Encountered exception on removing children watches from {}", + path, KeeperException.create(KeeperException.Code.get(rc))); + } + } + }, null); + } + } catch (InterruptedException e) { + logger.debug("Encountered exception on removing watches from {}", path, e); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + logger.debug("Encountered exception on removing watches from {}", path, e); + } childWatches.remove(path, watchers); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a1c15f8a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java index ee00ab9..3ad181d 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java @@ -34,6 +34,7 @@ public class TestZKWatcherManager { public void testRegisterUnregisterWatcher() throws Exception { ZKWatcherManager watcherManager = ZKWatcherManager.newBuilder() .name("test-register-unregister-watcher") + .zkc(null) .statsLogger(NullStatsLogger.INSTANCE) .build(); String path = "/test-register-unregister-watcher"; @@ -71,7 +72,7 @@ public class TestZKWatcherManager { assertEquals(event2, events.get(1)); // unregister watcher - watcherManager.unregisterChildWatcher(path, watcher); + watcherManager.unregisterChildWatcher(path, watcher, true); assertEquals(0, watcherManager.childWatches.size()); }
