aparnasuresh85 commented on code in PR #2585:
URL: https://github.com/apache/solr/pull/2585#discussion_r1690191452


##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -909,8 +884,8 @@ public void close() {
             });
 
     ExecutorUtil.shutdownAndAwaitTermination(notifications);
-    ExecutorUtil.shutdownAndAwaitTermination(collectionPropsNotifications);
-    if (closeClient) {
+    collectionPropertiesZkStateReader.close();
+    if (closeClient && zkClient != null) {

Review Comment:
   Removed the check



##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/CollectionPropertiesZkStateReader.java:
##########
@@ -0,0 +1,403 @@
+package org.apache.solr.common.cloud;
+
+import static java.util.Collections.emptyMap;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.solr.common.SolrCloseable;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkStateReader.CollectionWatch;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CollectionPropertiesZkStateReader implements SolrCloseable {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private volatile boolean closed = false;
+
+  private final SolrZkClient zkClient;
+
+  /** Collection properties being actively watched */
+  private final ConcurrentHashMap<String, VersionedCollectionProps> 
watchedCollectionProps =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Manages ZooKeeper watchers for each collection. These watchers monitor 
changes to the
+   * properties of the collection in ZooKeeper. When a change is detected in 
ZooKeeper, the watcher
+   * triggers an update, which then notifies the relevant 
"collectionPropsObserver".
+   */
+  private final ConcurrentHashMap<String, PropsWatcher> 
collectionPropsWatchers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Manages a list of observers (listeners) for each collection. These 
observers need to be
+   * notified when the properties of the collection change. When a 
collection's properties change,
+   * all registered observers for that collection are notified by a 
"collectionPropWatcher".
+   */
+  private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>>
+      collectionPropsObservers = new ConcurrentHashMap<>();
+
+  /** Used to submit notifications to Collection Properties watchers in order 
*/
+  private final ExecutorService collectionPropsNotifications =
+      ExecutorUtil.newMDCAwareSingleThreadExecutor(
+          new SolrNamedThreadFactory("collectionPropsNotifications"));
+
+  private final ExecutorService notifications =
+      ExecutorUtil.newMDCAwareCachedThreadPool("cachecleaner");
+
+  // only kept to identify if the cleaner has already been started.
+  private Future<?> collectionPropsCacheCleaner;
+
+  public CollectionPropertiesZkStateReader(SolrZkClient zkClient) {
+    this.zkClient = zkClient;
+    assert ObjectReleaseTracker.track(this);
+  }
+
+  /**
+   * Get and cache collection properties for a given collection. If the 
collection is watched, or
+   * still cached simply return it from the cache, otherwise fetch it directly 
from zookeeper and
+   * retain the value for at least cacheForMillis milliseconds. Cached 
properties are watched in
+   * zookeeper and updated automatically. This version of {@code 
getCollectionProperties} should be
+   * used when properties need to be consulted frequently in the absence of an 
active {@link
+   * CollectionPropsWatcher}.
+   *
+   * @param collection The collection for which properties are desired
+   * @param cacheForMillis The minimum number of milliseconds to maintain a 
cache for the specified
+   *     collection's properties. Setting a {@code CollectionPropsWatcher} 
will override this value
+   *     and retain the cache for the life of the watcher. A lack of changes 
in zookeeper may allow
+   *     the caching to remain for a greater duration up to the cycle time of 
{@code CacheCleaner}.
+   *     Passing zero for this value will explicitly remove the cached copy if 
and only if it is due
+   *     to expire and no watch exists. Any positive value will extend the 
expiration time if
+   *     required.
+   * @return a map representing the key/value properties for the collection.
+   */
+  public Map<String, String> getCollectionProperties(final String collection, 
long cacheForMillis) {
+    synchronized (watchedCollectionProps) { // synchronized on the specific 
collection
+      Watcher watcher = null;
+      if (cacheForMillis > 0) {
+        watcher =
+            collectionPropsWatchers.compute(
+                collection,
+                (c, w) ->
+                    w == null ? new PropsWatcher(c, cacheForMillis) : 
w.renew(cacheForMillis));
+      }
+      VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
+      boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > 
System.nanoTime();
+      long untilNs =
+          System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis, 
TimeUnit.MILLISECONDS);
+      Map<String, String> properties;
+      if (haveUnexpiredProps) {
+        properties = vprops.props;
+        vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
+      } else {
+        try {
+          VersionedCollectionProps vcp = fetchCollectionProperties(collection, 
watcher);
+          properties = vcp.props;
+          if (cacheForMillis > 0) {
+            vcp.cacheUntilNs = untilNs;
+            watchedCollectionProps.put(collection, vcp);
+          } else {
+            // we're synchronized on watchedCollectionProps and we can only 
get here if we have
+            // found an expired vprops above, so it is safe to remove the 
cached value and let the
+            // GC free up some mem a bit sooner.
+            if (!collectionPropsObservers.containsKey(collection)) {
+              watchedCollectionProps.remove(collection);
+            }
+          }
+        } catch (Exception e) {
+          throw new SolrException(
+              SolrException.ErrorCode.SERVER_ERROR,
+              "Error reading collection properties",
+              SolrZkClient.checkInterrupted(e));
+        }
+      }
+      return properties;
+    }
+  }
+
+  @Override
+  public void close() {
+    this.closed = true;
+    notifications.shutdownNow();
+    ExecutorUtil.shutdownAndAwaitTermination(notifications);
+    ExecutorUtil.shutdownAndAwaitTermination(collectionPropsNotifications);
+
+    assert ObjectReleaseTracker.release(this);

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@solr.apache.org
For additional commands, e-mail: issues-h...@solr.apache.org

Reply via email to