[ https://issues.apache.org/jira/browse/SOLR-8323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263112#comment-15263112 ]
ASF GitHub Bot commented on SOLR-8323: -------------------------------------- Github user dragonsinth commented on a diff in the pull request: https://github.com/apache/lucene-solr/pull/32#discussion_r61509937 --- Diff: solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java --- @@ -1066,32 +1079,201 @@ public static String getCollectionPath(String coll) { return COLLECTIONS_ZKNODE+"/"+coll + "/state.json"; } - public void addCollectionWatch(String coll) { - if (interestingCollections.add(coll)) { - LOG.info("addZkWatch [{}]", coll); - new StateWatcher(coll).refreshAndWatch(false); + /** + * Notify this reader that a local Core is a member of a collection, and so that collection + * state should be watched. + * + * Not a public API. This method should only be called from ZkController. + * + * The number of cores per-collection is tracked, and adding multiple cores from the same + * collection does not increase the number of watches. + * + * @param collection the collection that the core is a member of + * + * @see ZkStateReader#unregisterCore(String) + */ + public void registerCore(String collection) { + AtomicBoolean reconstructState = new AtomicBoolean(false); + collectionWatches.compute(collection, (k, v) -> { + if (v == null) { + reconstructState.set(true); + v = new CollectionWatch(); + } + v.coreRefCount++; + return v; + }); + if (reconstructState.get()) { + new StateWatcher(collection).refreshAndWatch(); + synchronized (getUpdateLock()) { + constructState(); + } + } + } + + /** + * Notify this reader that a local core that is a member of a collection has been closed. + * + * Not a public API. This method should only be called from ZkController. + * + * If no cores are registered for a collection, and there are no {@link CollectionStateWatcher}s + * for that collection either, the collection watch will be removed. + * + * @param collection the collection that the core belongs to + */ + public void unregisterCore(String collection) { + AtomicBoolean reconstructState = new AtomicBoolean(false); + collectionWatches.compute(collection, (k, v) -> { + if (v == null) + return null; + if (v.coreRefCount > 0) + v.coreRefCount--; + if (v.canBeRemoved()) { + watchedCollectionStates.remove(collection); + lazyCollectionStates.put(collection, new LazyCollectionRef(collection)); + reconstructState.set(true); + return null; + } + return v; + }); + if (reconstructState.get()) { + synchronized (getUpdateLock()) { + constructState(); + } + } + } + + /** + * Register a CollectionStateWatcher to be called when the state of a collection changes + * + * A given CollectionStateWatcher will be only called once. If you want to have a persistent watcher, + * it should register itself again in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} + * method. + */ + public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) { + AtomicBoolean watchSet = new AtomicBoolean(false); + collectionWatches.compute(collection, (k, v) -> { + if (v == null) { + v = new CollectionWatch(); + watchSet.set(true); + } + v.stateWatchers.add(stateWatcher); + return v; + }); + if (watchSet.get()) { + new StateWatcher(collection).refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); } } } + /** + * Block until a CollectionStatePredicate returns true, or the wait times out + * + * Note that the predicate may be called again even after it has returned true, so + * implementors should avoid changing state within the predicate call itself. + * + * @param collection the collection to watch + * @param wait how long to wait + * @param unit the units of the wait parameter + * @param predicate the predicate to call on state changes + * @throws InterruptedException on interrupt + * @throws TimeoutException on timeout + */ + public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate) + throws InterruptedException, TimeoutException { + + final CountDownLatch latch = new CountDownLatch(1); + + CollectionStateWatcher watcher = new CollectionStateWatcher() { + @Override + public void onStateChanged(Set<String> liveNodes, DocCollection collectionState) { + if (predicate.matches(liveNodes, collectionState)) { + latch.countDown(); + } else { + registerCollectionStateWatcher(collection, this); + } + } + }; + registerCollectionStateWatcher(collection, watcher); + + try { + // check the current state + DocCollection dc = clusterState.getCollectionOrNull(collection); + if (predicate.matches(liveNodes, dc)) + return; + + // wait for the watcher predicate to return true, or time out + if (!latch.await(wait, unit)) + throw new TimeoutException(); + + } + finally { + removeCollectionStateWatcher(collection, watcher); + } + } + + /** + * Remove a watcher from a collection's watch list. + * + * This allows Zookeeper watches to be removed if there is no interest in the + * collection. + * + * @param collection the collection + * @param watcher the watcher + */ + public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) { + collectionWatches.compute(collection, (k, v) -> { + if (v == null) + return null; + v.stateWatchers.remove(watcher); + if (v.canBeRemoved()) + return null; + return v; + }); + } + + private void notifyStateWatchers(String collection, DocCollection collectionState) { --- End diff -- I think we should pass in liveNodes; particularly in cases where we're firing a bunch of watchers, or even firing watchers on a bunch of collections at once, we can avoid the repeated volatile reads. > Add CollectionWatcher API to ZkStateReader > ------------------------------------------ > > Key: SOLR-8323 > URL: https://issues.apache.org/jira/browse/SOLR-8323 > Project: Solr > Issue Type: Improvement > Affects Versions: master > Reporter: Alan Woodward > Assignee: Alan Woodward > Attachments: SOLR-8323.patch, SOLR-8323.patch, SOLR-8323.patch, > SOLR-8323.patch > > > An API to watch for changes to collection state would be a generally useful > thing, both internally and for client use. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@lucene.apache.org For additional commands, e-mail: dev-h...@lucene.apache.org