[ 
https://issues.apache.org/jira/browse/SOLR-8323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263112#comment-15263112
 ] 

ASF GitHub Bot commented on SOLR-8323:
--------------------------------------

Github user dragonsinth commented on a diff in the pull request:

    https://github.com/apache/lucene-solr/pull/32#discussion_r61509937
  
    --- Diff: 
solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java ---
    @@ -1066,32 +1079,201 @@ public static String getCollectionPath(String 
coll) {
         return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
       }
     
    -  public void addCollectionWatch(String coll) {
    -    if (interestingCollections.add(coll)) {
    -      LOG.info("addZkWatch [{}]", coll);
    -      new StateWatcher(coll).refreshAndWatch(false);
    +  /**
    +   * Notify this reader that a local Core is a member of a collection, and 
so that collection
    +   * state should be watched.
    +   *
    +   * Not a public API.  This method should only be called from 
ZkController.
    +   *
    +   * The number of cores per-collection is tracked, and adding multiple 
cores from the same
    +   * collection does not increase the number of watches.
    +   *
    +   * @param collection the collection that the core is a member of
    +   *
    +   * @see ZkStateReader#unregisterCore(String)
    +   */
    +  public void registerCore(String collection) {
    +    AtomicBoolean reconstructState = new AtomicBoolean(false);
    +    collectionWatches.compute(collection, (k, v) -> {
    +      if (v == null) {
    +        reconstructState.set(true);
    +        v = new CollectionWatch();
    +      }
    +      v.coreRefCount++;
    +      return v;
    +    });
    +    if (reconstructState.get()) {
    +      new StateWatcher(collection).refreshAndWatch();
    +      synchronized (getUpdateLock()) {
    +        constructState();
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Notify this reader that a local core that is a member of a collection 
has been closed.
    +   *
    +   * Not a public API.  This method should only be called from 
ZkController.
    +   *
    +   * If no cores are registered for a collection, and there are no {@link 
CollectionStateWatcher}s
    +   * for that collection either, the collection watch will be removed.
    +   *
    +   * @param collection the collection that the core belongs to
    +   */
    +  public void unregisterCore(String collection) {
    +    AtomicBoolean reconstructState = new AtomicBoolean(false);
    +    collectionWatches.compute(collection, (k, v) -> {
    +      if (v == null)
    +        return null;
    +      if (v.coreRefCount > 0)
    +        v.coreRefCount--;
    +      if (v.canBeRemoved()) {
    +        watchedCollectionStates.remove(collection);
    +        lazyCollectionStates.put(collection, new 
LazyCollectionRef(collection));
    +        reconstructState.set(true);
    +        return null;
    +      }
    +      return v;
    +    });
    +    if (reconstructState.get()) {
    +      synchronized (getUpdateLock()) {
    +        constructState();
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Register a CollectionStateWatcher to be called when the state of a 
collection changes
    +   *
    +   * A given CollectionStateWatcher will be only called once.  If you want 
to have a persistent watcher,
    +   * it should register itself again in its {@link 
CollectionStateWatcher#onStateChanged(Set, DocCollection)}
    +   * method.
    +   */
    +  public void registerCollectionStateWatcher(String collection, 
CollectionStateWatcher stateWatcher) {
    +    AtomicBoolean watchSet = new AtomicBoolean(false);
    +    collectionWatches.compute(collection, (k, v) -> {
    +      if (v == null) {
    +        v = new CollectionWatch();
    +        watchSet.set(true);
    +      }
    +      v.stateWatchers.add(stateWatcher);
    +      return v;
    +    });
    +    if (watchSet.get()) {
    +      new StateWatcher(collection).refreshAndWatch();
           synchronized (getUpdateLock()) {
             constructState();
           }
         }
       }
     
    +  /**
    +   * Block until a CollectionStatePredicate returns true, or the wait 
times out
    +   *
    +   * Note that the predicate may be called again even after it has 
returned true, so
    +   * implementors should avoid changing state within the predicate call 
itself.
    +   *
    +   * @param collection the collection to watch
    +   * @param wait       how long to wait
    +   * @param unit       the units of the wait parameter
    +   * @param predicate  the predicate to call on state changes
    +   * @throws InterruptedException on interrupt
    +   * @throws TimeoutException on timeout
    +   */
    +  public void waitForState(final String collection, long wait, TimeUnit 
unit, CollectionStatePredicate predicate)
    +      throws InterruptedException, TimeoutException {
    +
    +    final CountDownLatch latch = new CountDownLatch(1);
    +
    +    CollectionStateWatcher watcher = new CollectionStateWatcher() {
    +      @Override
    +      public void onStateChanged(Set<String> liveNodes, DocCollection 
collectionState) {
    +        if (predicate.matches(liveNodes, collectionState)) {
    +          latch.countDown();
    +        } else {
    +          registerCollectionStateWatcher(collection, this);
    +        }
    +      }
    +    };
    +    registerCollectionStateWatcher(collection, watcher);
    +
    +    try {
    +      // check the current state
    +      DocCollection dc = clusterState.getCollectionOrNull(collection);
    +      if (predicate.matches(liveNodes, dc))
    +        return;
    +
    +      // wait for the watcher predicate to return true, or time out
    +      if (!latch.await(wait, unit))
    +        throw new TimeoutException();
    +
    +    }
    +    finally {
    +      removeCollectionStateWatcher(collection, watcher);
    +    }
    +  }
    +
    +  /**
    +   * Remove a watcher from a collection's watch list.
    +   *
    +   * This allows Zookeeper watches to be removed if there is no interest 
in the
    +   * collection.
    +   *
    +   * @param collection the collection
    +   * @param watcher    the watcher
    +   */
    +  public void removeCollectionStateWatcher(String collection, 
CollectionStateWatcher watcher) {
    +    collectionWatches.compute(collection, (k, v) -> {
    +      if (v == null)
    +        return null;
    +      v.stateWatchers.remove(watcher);
    +      if (v.canBeRemoved())
    +        return null;
    +      return v;
    +    });
    +  }
    +
    +  private void notifyStateWatchers(String collection, DocCollection 
collectionState) {
    --- End diff --
    
    I think we should pass in liveNodes; particularly in cases where we're 
firing a bunch of watchers, or even firing watchers on a bunch of collections 
at once, we can avoid the repeated volatile reads.


> Add CollectionWatcher API to ZkStateReader
> ------------------------------------------
>
>                 Key: SOLR-8323
>                 URL: https://issues.apache.org/jira/browse/SOLR-8323
>             Project: Solr
>          Issue Type: Improvement
>    Affects Versions: master
>            Reporter: Alan Woodward
>            Assignee: Alan Woodward
>         Attachments: SOLR-8323.patch, SOLR-8323.patch, SOLR-8323.patch, 
> SOLR-8323.patch
>
>
> An API to watch for changes to collection state would be a generally useful 
> thing, both internally and for client use.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@lucene.apache.org
For additional commands, e-mail: dev-h...@lucene.apache.org

Reply via email to