This is an automated email from the ASF dual-hosted git repository.

dsmiley pushed a commit to branch branch_9_7
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9_7 by this push:
     new c7f4fa592b8 SOLR-17391: Fixed thread pool misconfiguration regression 
(#2619)
c7f4fa592b8 is described below

commit c7f4fa592b81d1af361aa9971a23fe4d29cec84c
Author: Pierre Salagnac <[email protected]>
AuthorDate: Thu Aug 8 13:30:36 2024 -0400

    SOLR-17391: Fixed thread pool misconfiguration regression (#2619)
    
    Shard splits and concurrent/large collection backup/restore performance was 
serial; should have been happening 5 at a time.  UpdateLog replay was a little 
suboptimal in thread usage too.
    
    The new multiThreaded (concurrent segment search) should be improved as 
well.
    
    Enhance OrderedExecutorTest to test realistically.
    
    Co-authored-by: David Smiley <[email protected]>
---
 solr/CHANGES.txt                                   |  4 ++
 .../org/apache/solr/util/OrderedExecutorTest.java  | 28 ++++++-----
 .../org/apache/solr/common/util/ExecutorUtil.java  | 33 +++++++++----
 .../apache/solr/common/util/ExecutorUtilTest.java  | 54 ++++++++++++++++++++++
 4 files changed, 99 insertions(+), 20 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 8af26f776d6..baa1247f13d 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -134,6 +134,10 @@ Bug Fixes
 
 * SOLR-17394: Detect and handle non-200 HTTP status codes for requests  made 
by IndexFetcher (Jason Gerlowski)
 
+* SOLR-17391: Fixed performance regression of misconfigured threadpools from 
SOLR-16879 (Solr 9.4).
+  Shard splits and concurrent/large collection backup/restore performance was 
serial.  UpdateLog
+  replay was a little suboptimal in thread usage too.  (Pierre Salagnac, Hakan 
Özler, David Smiley)
+
 Dependency Upgrades
 ---------------------
 * PR#2512: Update dependency com.carrotsearch:hppc to v0.10.0 (solrbot)
diff --git a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java 
b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
index 4b49baa6e01..02da580b2bf 100644
--- a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
+++ b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.solr.SolrTestCase;
 import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,10 +38,19 @@ import org.slf4j.LoggerFactory;
 public class OrderedExecutorTest extends SolrTestCase {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private static OrderedExecutor newOrderedExecutor(int numThreads) {
+    // initialize exactly as done in CoreContainer so we test realistically
+    return new OrderedExecutor(
+        numThreads,
+        ExecutorUtil.newMDCAwareCachedThreadPool(
+            numThreads, // thread count
+            numThreads, // queue size
+            new SolrNamedThreadFactory("testOrderedExecutor")));
+  }
+
   @Test
   public void testExecutionInOrder() {
-    OrderedExecutor orderedExecutor =
-        new OrderedExecutor(10, 
ExecutorUtil.newMDCAwareCachedThreadPool("executeInOrderTest"));
+    var orderedExecutor = newOrderedExecutor(10);
     IntBox intBox = new IntBox();
     for (int i = 0; i < 100; i++) {
       orderedExecutor.execute(1, () -> intBox.value++);
@@ -53,9 +63,7 @@ public class OrderedExecutorTest extends SolrTestCase {
   public void testLockWhenQueueIsFull() {
     final ExecutorService controlExecutor =
         
ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull_control");
-    final OrderedExecutor orderedExecutor =
-        new OrderedExecutor(
-            10, 
ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull_test"));
+    final var orderedExecutor = newOrderedExecutor(10);
 
     try {
       // AAA and BBB events will both depend on the use of the same lockId
@@ -111,9 +119,7 @@ public class OrderedExecutorTest extends SolrTestCase {
 
     final ExecutorService controlExecutor =
         ExecutorUtil.newMDCAwareCachedThreadPool("testRunInParallel_control");
-    final OrderedExecutor orderedExecutor =
-        new OrderedExecutor(
-            parallelism, 
ExecutorUtil.newMDCAwareCachedThreadPool("testRunInParallel_test"));
+    final var orderedExecutor = newOrderedExecutor(parallelism);
 
     try {
       // distinct lockIds should be able to be used in parallel, up to the 
size of the executor,
@@ -216,8 +222,7 @@ public class OrderedExecutorTest extends SolrTestCase {
       base.put(i, i);
       run.put(i, i);
     }
-    OrderedExecutor orderedExecutor =
-        new OrderedExecutor(10, 
ExecutorUtil.newMDCAwareCachedThreadPool("testStress"));
+    var orderedExecutor = newOrderedExecutor(10);
     for (int i = 0; i < 1000; i++) {
       int key = random().nextInt(N);
       base.put(key, base.get(key) + 1);
@@ -233,8 +238,7 @@ public class OrderedExecutorTest extends SolrTestCase {
 
   @Test
   public void testMaxSize() throws InterruptedException {
-    OrderedExecutor orderedExecutor =
-        new OrderedExecutor(1, 
ExecutorUtil.newMDCAwareCachedThreadPool("single"));
+    var orderedExecutor = newOrderedExecutor(1);
 
     CountDownLatch isRunning = new CountDownLatch(1);
     CountDownLatch blockingLatch = new CountDownLatch(1);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java 
b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
index c917438249a..b21a968adef 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
@@ -237,21 +237,38 @@ public class ExecutorUtil {
     return newMDCAwareCachedThreadPool(new SolrNamedThreadFactory(name));
   }
 
-  /** See {@link 
java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)} */
+  /**
+   * Create a new pool of threads, with no limit for the number of threads. 
The pool has no task
+   * queue. Each submitted task is executed immediately, either by reusing an 
existing thread if one
+   * is available, or by starting a new thread. Unused threads will be closed 
after 60 seconds.
+   */
   public static ExecutorService newMDCAwareCachedThreadPool(ThreadFactory 
threadFactory) {
     return new MDCAwareThreadPoolExecutor(
         0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), 
threadFactory);
   }
 
+  /**
+   * Create a new pool of threads. Threads are created for new work if there 
is room to do so up to
+   * {@code maxThreads}. Beyond that, the queue is used up to {@code 
queueCapacity}. Beyond that,
+   * work is rejected with an exception. Unused threads will be closed after 
60 seconds.
+   */
   public static ExecutorService newMDCAwareCachedThreadPool(
       int maxThreads, int queueCapacity, ThreadFactory threadFactory) {
-    return new MDCAwareThreadPoolExecutor(
-        0,
-        maxThreads,
-        60L,
-        TimeUnit.SECONDS,
-        new LinkedBlockingQueue<>(queueCapacity),
-        threadFactory);
+    // Create an executor with same value of core size and max total size. 
With an unbounded queue,
+    // the ThreadPoolExecutor ignores the configured max value and only 
considers core pool size.
+    // Since we allow core threads to die when idle for too long, this ends in 
having a pool with
+    // lazily-initialized and cached threads.
+    MDCAwareThreadPoolExecutor executor =
+        new MDCAwareThreadPoolExecutor(
+            maxThreads,
+            maxThreads,
+            60L,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(queueCapacity),
+            threadFactory);
+    // Allow core threads to die
+    executor.allowCoreThreadTimeOut(true);
+    return executor;
   }
 
   @SuppressForbidden(reason = "class customizes ThreadPoolExecutor so it can 
be used instead")
diff --git 
a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java 
b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
index a9df98a296b..f797ef547a3 100644
--- a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
+++ b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
@@ -28,9 +28,12 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.lucene.util.NamedThreadFactory;
 import org.apache.solr.SolrTestCase;
+import org.apache.solr.logging.MDCLoggingContext;
 import org.apache.solr.util.TimeOut;
 import org.junit.Test;
+import org.slf4j.MDC;
 
 public class ExecutorUtilTest extends SolrTestCase {
 
@@ -108,6 +111,57 @@ public class ExecutorUtilTest extends SolrTestCase {
     }
   }
 
+  @Test
+  public void testCMDCAwareCachedThreadPool() throws Exception {
+    // 5 threads max, unbounded queue
+    ExecutorService executor =
+        ExecutorUtil.newMDCAwareCachedThreadPool(
+            5, Integer.MAX_VALUE, new NamedThreadFactory("test"));
+
+    AtomicInteger concurrentTasks = new AtomicInteger();
+    AtomicInteger maxConcurrentTasks = new AtomicInteger();
+    int taskCount = 5 + random().nextInt(100);
+    CountDownLatch latch = new CountDownLatch(5);
+    List<Future<Void>> futures = new ArrayList<>();
+
+    for (int i = 0; i < taskCount; i++) {
+      String core = "id_" + random().nextLong();
+
+      Callable<Void> task =
+          () -> {
+            // ensure we never have too many concurrent tasks
+            int concurrent = concurrentTasks.incrementAndGet();
+            assertTrue(concurrent <= 5);
+            maxConcurrentTasks.getAndAccumulate(concurrent, Math::max);
+
+            // assert MDC context is copied from the parent thread that 
submitted the task
+            assertEquals(core, MDC.get("core"));
+
+            // The first 4 tasks to be executed will wait on the latch, and 
the 5th will
+            // release all the threads.
+            latch.countDown();
+            latch.await(1, TimeUnit.SECONDS);
+            concurrentTasks.decrementAndGet();
+            return null;
+          };
+
+      MDCLoggingContext.setCoreName(core);
+      futures.add(executor.submit(task));
+    }
+
+    ExecutorUtil.shutdownAndAwaitTermination(executor);
+
+    for (Future<Void> future : futures) {
+      // Throws an exception (and make the test fail) if an assertion failed
+      // in the subtask
+      future.get();
+    }
+
+    // assert the pool was actually multithreaded. Since we submitted many 
tasks,
+    // all the threads should have been started
+    assertEquals(5, maxConcurrentTasks.get());
+  }
+
   private static final class Worker implements Callable<Boolean> {
     // how we communiate out to our caller
     private final CountDownLatch taskStartedLatch = new CountDownLatch(1);

Reply via email to