This is an automated email from the ASF dual-hosted git repository.

dsmiley pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 83b4d3813fd SOLR-17396: Reduce thread contention in 
ZkStateReader.getCollectionProperties() (#2611)
83b4d3813fd is described below

commit 83b4d3813fd3bf99b35c568e705de286f2fe6098
Author: aparnasuresh85 <[email protected]>
AuthorDate: Thu Aug 8 14:20:00 2024 -0400

    SOLR-17396: Reduce thread contention in 
ZkStateReader.getCollectionProperties() (#2611)
    
    And simplify cacheCleaner thread management.
    
    (cherry picked from commit 697f1a75d23b055b9bbd52ed5562871a13116f5c)
---
 solr/CHANGES.txt                                   |   2 +
 .../cloud/CollectionPropertiesZkStateReader.java   | 128 ++++++++++-----------
 2 files changed, 65 insertions(+), 65 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3a52175f5f6..273747359ad 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -22,6 +22,8 @@ Optimizations
 * SOLR-17102: The VersionBucket indexing lock mechanism was replaced with 
something just as fast yet
   that which consumes almost no memory, saving 1MB of memory per SolrCore.  
(David Smiley)
 
+* SOLR-17396: Reduce thread contention in 
ZkStateReader.getCollectionProperties(). (Aparna Suresh, David Smiley, Paul 
McArthur)
+
 Bug Fixes
 ---------------------
 (No changes)
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/CollectionPropertiesZkStateReader.java
 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/CollectionPropertiesZkStateReader.java
index 2bb36116afd..93ea4d9cfe3 100644
--- 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/CollectionPropertiesZkStateReader.java
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/CollectionPropertiesZkStateReader.java
@@ -25,8 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.solr.common.SolrException;
@@ -47,7 +47,6 @@ public class CollectionPropertiesZkStateReader implements 
Closeable {
   private volatile boolean closed = false;
 
   private final SolrZkClient zkClient;
-  private final ZkStateReader zkStateReader;
 
   /** Collection properties being actively watched */
   private final ConcurrentHashMap<String, VersionedCollectionProps> 
watchedCollectionProps =
@@ -74,15 +73,12 @@ public class CollectionPropertiesZkStateReader implements 
Closeable {
       ExecutorUtil.newMDCAwareSingleThreadExecutor(
           new SolrNamedThreadFactory("collectionPropsNotifications"));
 
-  private final ExecutorService notifications =
-      ExecutorUtil.newMDCAwareCachedThreadPool("cachecleaner");
+  private volatile ScheduledThreadPoolExecutor cacheCleanerExecutor = null;
 
-  // only kept to identify if the cleaner has already been started.
-  private Future<?> collectionPropsCacheCleaner;
+  private final ConcurrentHashMap<String, Object> collectionLocks = new 
ConcurrentHashMap<>();
 
   public CollectionPropertiesZkStateReader(ZkStateReader zkStateReader) {
     this.zkClient = zkStateReader.getZkClient();
-    this.zkStateReader = zkStateReader;
   }
 
   /**
@@ -104,54 +100,58 @@ public class CollectionPropertiesZkStateReader implements 
Closeable {
    * @return a map representing the key/value properties for the collection.
    */
   public Map<String, String> getCollectionProperties(final String collection, 
long cacheForMillis) {
-    synchronized (watchedCollectionProps) { // synchronized on the specific 
collection
-      Watcher watcher = null;
-      if (cacheForMillis > 0) {
-        watcher =
-            collectionPropsWatchers.compute(
-                collection,
-                (c, w) ->
-                    w == null ? new PropsWatcher(c, cacheForMillis) : 
w.renew(cacheForMillis));
-      }
-      VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
-      boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > 
System.nanoTime();
-      long untilNs =
-          System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis, 
TimeUnit.MILLISECONDS);
-      Map<String, String> properties;
+    Watcher watcher = null; // synchronized on the specific collection
+    if (cacheForMillis > 0) {
+      watcher =
+          collectionPropsWatchers.compute(
+              collection,
+              (c, w) -> w == null ? new PropsWatcher(c, cacheForMillis) : 
w.renew(cacheForMillis));
+    }
+    VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
+    boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > 
System.nanoTime();
+    long untilNs =
+        System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis, 
TimeUnit.MILLISECONDS);
+    if (haveUnexpiredProps) {
+      vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
+      return vprops.props;
+    }
+    // Synchronize only when properties are expired or not present
+    synchronized (getCollectionLock(collection)) {
+      // Re-check inside the synchronized block to avoid race conditions
+      vprops = watchedCollectionProps.get(collection);
+      haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > 
System.nanoTime();
       if (haveUnexpiredProps) {
-        properties = vprops.props;
         vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
-      } else {
-        try {
-          VersionedCollectionProps vcp = fetchCollectionProperties(collection, 
watcher);
-          properties = vcp.props;
-          if (cacheForMillis > 0) {
-            vcp.cacheUntilNs = untilNs;
-            watchedCollectionProps.put(collection, vcp);
-          } else {
-            // we're synchronized on watchedCollectionProps and we can only 
get here if we have
-            // found an expired vprops above, so it is safe to remove the 
cached value and let the
-            // GC free up some mem a bit sooner.
-            if (!collectionPropsObservers.containsKey(collection)) {
-              watchedCollectionProps.remove(collection);
-            }
+        return vprops.props;
+      }
+      try {
+        VersionedCollectionProps vcp = fetchCollectionProperties(collection, 
watcher);
+        Map<String, String> properties = vcp.props;
+        if (cacheForMillis > 0) {
+          vcp.cacheUntilNs = untilNs;
+          watchedCollectionProps.put(collection, vcp);
+        } else {
+          // we're synchronized on watchedCollectionProps and we can only get 
here if we have
+          // found an expired vprops above, so it is safe to remove the cached 
value and let the
+          // GC free up some mem a bit sooner.
+          if (!collectionPropsObservers.containsKey(collection)) {
+            watchedCollectionProps.remove(collection);
           }
-        } catch (Exception e) {
-          throw new SolrException(
-              SolrException.ErrorCode.SERVER_ERROR,
-              "Error reading collection properties",
-              SolrZkClient.checkInterrupted(e));
         }
+        return properties;
+      } catch (Exception e) {
+        throw new SolrException(
+            SolrException.ErrorCode.SERVER_ERROR,
+            "Error reading collection properties",
+            SolrZkClient.checkInterrupted(e));
       }
-      return properties;
     }
   }
 
   @Override
   public void close() {
     this.closed = true;
-    notifications.shutdownNow();
-    ExecutorUtil.shutdownAndAwaitTermination(notifications);
+    ExecutorUtil.shutdownAndAwaitTermination(cacheCleanerExecutor);
     ExecutorUtil.shutdownAndAwaitTermination(collectionPropsNotifications);
   }
 
@@ -217,7 +217,8 @@ public class CollectionPropertiesZkStateReader implements 
Closeable {
      */
     void refreshAndWatch(boolean notifyWatchers) {
       try {
-        synchronized (watchedCollectionProps) { // making decisions based on 
the result of a get...
+        synchronized (getCollectionLock(coll)) {
+          // making decisions based on the result of a get...
           VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
           Map<String, String> properties = vcp.props;
           VersionedCollectionProps existingVcp = 
watchedCollectionProps.get(coll);
@@ -262,6 +263,10 @@ public class CollectionPropertiesZkStateReader implements 
Closeable {
     }
   }
 
+  private Object getCollectionLock(String collection) {
+    return collectionLocks.computeIfAbsent(collection, k -> new Object());
+  }
+
   public void registerCollectionPropsWatcher(
       final String collection, CollectionPropsWatcher propsWatcher) {
     AtomicBoolean watchSet = new AtomicBoolean(false);
@@ -300,10 +305,11 @@ public class CollectionPropertiesZkStateReader implements 
Closeable {
       throws KeeperException, InterruptedException {
     final String znodePath = getCollectionPropsPath(collection);
     // lazy init cache cleaner once we know someone is using collection 
properties.
-    if (collectionPropsCacheCleaner == null) {
-      synchronized (zkStateReader.getUpdateLock()) { // Double-checked locking
-        if (collectionPropsCacheCleaner == null) {
-          collectionPropsCacheCleaner = notifications.submit(new 
CacheCleaner());
+    if (cacheCleanerExecutor == null) {
+      synchronized (this) {
+        if (cacheCleanerExecutor == null) {
+          cacheCleanerExecutor = new ScheduledThreadPoolExecutor(1);
+          cacheCleanerExecutor.scheduleAtFixedRate(new CacheCleaner(), 0, 1, 
TimeUnit.MINUTES);
         }
       }
     }
@@ -377,20 +383,12 @@ public class CollectionPropertiesZkStateReader implements 
Closeable {
   private class CacheCleaner implements Runnable {
     @Override
     public void run() {
-      while (!Thread.interrupted()) {
-        try {
-          Thread.sleep(60000);
-        } catch (InterruptedException e) {
-          // Executor shutdown will send us an interrupt
-          break;
-        }
-        watchedCollectionProps
-            .entrySet()
-            .removeIf(
-                entry ->
-                    entry.getValue().cacheUntilNs < System.nanoTime()
-                        && 
!collectionPropsObservers.containsKey(entry.getKey()));
-      }
+      watchedCollectionProps
+          .entrySet()
+          .removeIf(
+              entry ->
+                  entry.getValue().cacheUntilNs < System.nanoTime()
+                      && 
!collectionPropsObservers.containsKey(entry.getKey()));
     }
   }
 
@@ -402,10 +400,10 @@ public class CollectionPropertiesZkStateReader implements 
Closeable {
           v.stateWatchers.remove(watcher);
           if (v.canBeRemoved()) {
             // don't want this to happen in middle of other blocks that might 
add it back.
-            synchronized (watchedCollectionProps) {
+            synchronized (getCollectionLock(collection)) {
               watchedCollectionProps.remove(collection);
+              return null;
             }
-            return null;
           }
           return v;
         });

Reply via email to