This is an automated email from the ASF dual-hosted git repository.
dsmiley pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new 83b4d3813fd SOLR-17396: Reduce thread contention in
ZkStateReader.getCollectionProperties() (#2611)
83b4d3813fd is described below
commit 83b4d3813fd3bf99b35c568e705de286f2fe6098
Author: aparnasuresh85 <[email protected]>
AuthorDate: Thu Aug 8 14:20:00 2024 -0400
SOLR-17396: Reduce thread contention in
ZkStateReader.getCollectionProperties() (#2611)
And simplify cacheCleaner thread management.
(cherry picked from commit 697f1a75d23b055b9bbd52ed5562871a13116f5c)
---
solr/CHANGES.txt | 2 +
.../cloud/CollectionPropertiesZkStateReader.java | 128 ++++++++++-----------
2 files changed, 65 insertions(+), 65 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3a52175f5f6..273747359ad 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -22,6 +22,8 @@ Optimizations
* SOLR-17102: The VersionBucket indexing lock mechanism was replaced with
something just as fast yet
that which consumes almost no memory, saving 1MB of memory per SolrCore.
(David Smiley)
+* SOLR-17396: Reduce thread contention in
ZkStateReader.getCollectionProperties(). (Aparna Suresh, David Smiley, Paul
McArthur)
+
Bug Fixes
---------------------
(No changes)
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/CollectionPropertiesZkStateReader.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/CollectionPropertiesZkStateReader.java
index 2bb36116afd..93ea4d9cfe3 100644
---
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/CollectionPropertiesZkStateReader.java
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/CollectionPropertiesZkStateReader.java
@@ -25,8 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.solr.common.SolrException;
@@ -47,7 +47,6 @@ public class CollectionPropertiesZkStateReader implements
Closeable {
private volatile boolean closed = false;
private final SolrZkClient zkClient;
- private final ZkStateReader zkStateReader;
/** Collection properties being actively watched */
private final ConcurrentHashMap<String, VersionedCollectionProps>
watchedCollectionProps =
@@ -74,15 +73,12 @@ public class CollectionPropertiesZkStateReader implements
Closeable {
ExecutorUtil.newMDCAwareSingleThreadExecutor(
new SolrNamedThreadFactory("collectionPropsNotifications"));
- private final ExecutorService notifications =
- ExecutorUtil.newMDCAwareCachedThreadPool("cachecleaner");
+ private volatile ScheduledThreadPoolExecutor cacheCleanerExecutor = null;
- // only kept to identify if the cleaner has already been started.
- private Future<?> collectionPropsCacheCleaner;
+ private final ConcurrentHashMap<String, Object> collectionLocks = new
ConcurrentHashMap<>();
public CollectionPropertiesZkStateReader(ZkStateReader zkStateReader) {
this.zkClient = zkStateReader.getZkClient();
- this.zkStateReader = zkStateReader;
}
/**
@@ -104,54 +100,58 @@ public class CollectionPropertiesZkStateReader implements
Closeable {
* @return a map representing the key/value properties for the collection.
*/
public Map<String, String> getCollectionProperties(final String collection,
long cacheForMillis) {
- synchronized (watchedCollectionProps) { // synchronized on the specific
collection
- Watcher watcher = null;
- if (cacheForMillis > 0) {
- watcher =
- collectionPropsWatchers.compute(
- collection,
- (c, w) ->
- w == null ? new PropsWatcher(c, cacheForMillis) :
w.renew(cacheForMillis));
- }
- VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
- boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs >
System.nanoTime();
- long untilNs =
- System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis,
TimeUnit.MILLISECONDS);
- Map<String, String> properties;
+ Watcher watcher = null; // synchronized on the specific collection
+ if (cacheForMillis > 0) {
+ watcher =
+ collectionPropsWatchers.compute(
+ collection,
+ (c, w) -> w == null ? new PropsWatcher(c, cacheForMillis) :
w.renew(cacheForMillis));
+ }
+ VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
+ boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs >
System.nanoTime();
+ long untilNs =
+ System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis,
TimeUnit.MILLISECONDS);
+ if (haveUnexpiredProps) {
+ vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
+ return vprops.props;
+ }
+ // Synchronize only when properties are expired or not present
+ synchronized (getCollectionLock(collection)) {
+ // Re-check inside the synchronized block to avoid race conditions
+ vprops = watchedCollectionProps.get(collection);
+ haveUnexpiredProps = vprops != null && vprops.cacheUntilNs >
System.nanoTime();
if (haveUnexpiredProps) {
- properties = vprops.props;
vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
- } else {
- try {
- VersionedCollectionProps vcp = fetchCollectionProperties(collection,
watcher);
- properties = vcp.props;
- if (cacheForMillis > 0) {
- vcp.cacheUntilNs = untilNs;
- watchedCollectionProps.put(collection, vcp);
- } else {
- // we're synchronized on watchedCollectionProps and we can only
get here if we have
- // found an expired vprops above, so it is safe to remove the
cached value and let the
- // GC free up some mem a bit sooner.
- if (!collectionPropsObservers.containsKey(collection)) {
- watchedCollectionProps.remove(collection);
- }
+ return vprops.props;
+ }
+ try {
+ VersionedCollectionProps vcp = fetchCollectionProperties(collection,
watcher);
+ Map<String, String> properties = vcp.props;
+ if (cacheForMillis > 0) {
+ vcp.cacheUntilNs = untilNs;
+ watchedCollectionProps.put(collection, vcp);
+ } else {
+ // we're synchronized on watchedCollectionProps and we can only get
here if we have
+ // found an expired vprops above, so it is safe to remove the cached
value and let the
+ // GC free up some mem a bit sooner.
+ if (!collectionPropsObservers.containsKey(collection)) {
+ watchedCollectionProps.remove(collection);
}
- } catch (Exception e) {
- throw new SolrException(
- SolrException.ErrorCode.SERVER_ERROR,
- "Error reading collection properties",
- SolrZkClient.checkInterrupted(e));
}
+ return properties;
+ } catch (Exception e) {
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Error reading collection properties",
+ SolrZkClient.checkInterrupted(e));
}
- return properties;
}
}
@Override
public void close() {
this.closed = true;
- notifications.shutdownNow();
- ExecutorUtil.shutdownAndAwaitTermination(notifications);
+ ExecutorUtil.shutdownAndAwaitTermination(cacheCleanerExecutor);
ExecutorUtil.shutdownAndAwaitTermination(collectionPropsNotifications);
}
@@ -217,7 +217,8 @@ public class CollectionPropertiesZkStateReader implements
Closeable {
*/
void refreshAndWatch(boolean notifyWatchers) {
try {
- synchronized (watchedCollectionProps) { // making decisions based on
the result of a get...
+ synchronized (getCollectionLock(coll)) {
+ // making decisions based on the result of a get...
VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
Map<String, String> properties = vcp.props;
VersionedCollectionProps existingVcp =
watchedCollectionProps.get(coll);
@@ -262,6 +263,10 @@ public class CollectionPropertiesZkStateReader implements
Closeable {
}
}
+ private Object getCollectionLock(String collection) {
+ return collectionLocks.computeIfAbsent(collection, k -> new Object());
+ }
+
public void registerCollectionPropsWatcher(
final String collection, CollectionPropsWatcher propsWatcher) {
AtomicBoolean watchSet = new AtomicBoolean(false);
@@ -300,10 +305,11 @@ public class CollectionPropertiesZkStateReader implements
Closeable {
throws KeeperException, InterruptedException {
final String znodePath = getCollectionPropsPath(collection);
// lazy init cache cleaner once we know someone is using collection
properties.
- if (collectionPropsCacheCleaner == null) {
- synchronized (zkStateReader.getUpdateLock()) { // Double-checked locking
- if (collectionPropsCacheCleaner == null) {
- collectionPropsCacheCleaner = notifications.submit(new
CacheCleaner());
+ if (cacheCleanerExecutor == null) {
+ synchronized (this) {
+ if (cacheCleanerExecutor == null) {
+ cacheCleanerExecutor = new ScheduledThreadPoolExecutor(1);
+ cacheCleanerExecutor.scheduleAtFixedRate(new CacheCleaner(), 0, 1,
TimeUnit.MINUTES);
}
}
}
@@ -377,20 +383,12 @@ public class CollectionPropertiesZkStateReader implements
Closeable {
private class CacheCleaner implements Runnable {
@Override
public void run() {
- while (!Thread.interrupted()) {
- try {
- Thread.sleep(60000);
- } catch (InterruptedException e) {
- // Executor shutdown will send us an interrupt
- break;
- }
- watchedCollectionProps
- .entrySet()
- .removeIf(
- entry ->
- entry.getValue().cacheUntilNs < System.nanoTime()
- &&
!collectionPropsObservers.containsKey(entry.getKey()));
- }
+ watchedCollectionProps
+ .entrySet()
+ .removeIf(
+ entry ->
+ entry.getValue().cacheUntilNs < System.nanoTime()
+ &&
!collectionPropsObservers.containsKey(entry.getKey()));
}
}
@@ -402,10 +400,10 @@ public class CollectionPropertiesZkStateReader implements
Closeable {
v.stateWatchers.remove(watcher);
if (v.canBeRemoved()) {
// don't want this to happen in middle of other blocks that might
add it back.
- synchronized (watchedCollectionProps) {
+ synchronized (getCollectionLock(collection)) {
watchedCollectionProps.remove(collection);
+ return null;
}
- return null;
}
return v;
});