This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 20e1f62e52 PHOENIX-7670 Region level threadpool for uncovered index to 
scan data table (#2252)
20e1f62e52 is described below

commit 20e1f62e52df8de3f5ebdc940479aeae393c1def
Author: Palash Chauhan <[email protected]>
AuthorDate: Fri Aug 15 11:57:55 2025 -0700

    PHOENIX-7670 Region level threadpool for uncovered index to scan data table 
(#2252)
    
    Co-authored-by: Palash Chauhan 
<[email protected]>
---
 .../org/apache/phoenix/query/QueryServices.java    |  4 +++
 .../apache/phoenix/query/QueryServicesOptions.java |  3 ++
 .../coprocessor/PhoenixRegionServerEndpoint.java   | 32 ++++++++++++++++++++++
 .../UncoveredGlobalIndexRegionScanner.java         | 19 ++++---------
 .../hbase/index/parallel/ThreadPoolManager.java    | 10 +++++++
 .../apache/phoenix/end2end/BasePermissionsIT.java  |  1 +
 6 files changed, 56 insertions(+), 13 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index e3f494897d..000f5a596a 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -627,6 +627,10 @@ public interface QueryServices extends SQLCloseable {
   String PHOENIX_CDC_STREAM_PARTITION_EXPIRY_MIN_AGE_MS =
     "phoenix.cdc.stream.partition.expiry.min.age.ms";
 
+  String PHOENIX_UNCOVERED_INDEX_MAX_POOL_SIZE = 
"phoenix.uncovered.index.threads.max";
+  String PHOENIX_UNCOVERED_INDEX_KEEP_ALIVE_TIME_SEC =
+    "phoenix.uncovered.index.threads.keepalive.sec";
+
   /**
    * Get executor service used for parallel scans
    */
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index f6f44c23c1..8e4c7febf5 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -501,6 +501,9 @@ public class QueryServicesOptions {
   public static final long 
DEFAULT_PHOENIX_CDC_STREAM_PARTITION_EXPIRY_MIN_AGE_MS =
     30 * 60 * 60 * 1000; // 30 hours
 
+  public static final int DEFAULT_PHOENIX_UNCOVERED_INDEX_MAX_POOL_SIZE = 512;
+  public static final int DEFAULT_PHOENIX_UNCOVERED_INDEX_KEEP_ALIVE_TIME_SEC 
= 60; // 1min
+
   private final Configuration config;
 
   private QueryServicesOptions(Configuration config) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index fabc28e712..bdddf0b10b 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -31,8 +31,14 @@ import org.apache.phoenix.cache.ServerMetadataCacheImpl;
 import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
 import 
org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
 import 
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import org.apache.phoenix.hbase.index.parallel.TaskRunner;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
 import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
@@ -47,11 +53,23 @@ public class PhoenixRegionServerEndpoint extends
   private MetricsMetadataCachingSource metricsSource;
   protected Configuration conf;
 
+  // regionserver level thread pool used by Uncovered Indexes to scan data 
table rows
+  private static TaskRunner uncoveredIndexThreadPool;
+
   @Override
   public void start(CoprocessorEnvironment env) throws IOException {
     this.conf = env.getConfiguration();
     this.metricsSource =
       
MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
+    initUncoveredIndexThreadPool(this.conf);
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+    if (uncoveredIndexThreadPool != null) {
+      uncoveredIndexThreadPool
+        .stop("PhoenixRegionServerEndpoint is stopping. Shutting down 
uncovered index threadpool.");
+    }
   }
 
   @Override
@@ -133,4 +151,18 @@ public class PhoenixRegionServerEndpoint extends
     return ServerMetadataCacheImpl.getInstance(conf);
   }
 
+  public static TaskRunner getUncoveredIndexThreadPool() {
+    return uncoveredIndexThreadPool;
+  }
+
+  private static void initUncoveredIndexThreadPool(Configuration conf) {
+    uncoveredIndexThreadPool = new WaitForCompletionTaskRunner(
+      ThreadPoolManager.getExecutor(new ThreadPoolBuilder("Uncovered Global 
Index", conf)
+        .setMaxThread(QueryServices.PHOENIX_UNCOVERED_INDEX_MAX_POOL_SIZE,
+          QueryServicesOptions.DEFAULT_PHOENIX_UNCOVERED_INDEX_MAX_POOL_SIZE)
+        
.setCoreTimeout(QueryServices.PHOENIX_UNCOVERED_INDEX_KEEP_ALIVE_TIME_SEC,
+          
QueryServicesOptions.DEFAULT_PHOENIX_UNCOVERED_INDEX_KEEP_ALIVE_TIME_SEC)));
+    LOGGER.info("Initialized region level thread pool for Uncovered Global 
Indexes.");
+  }
+
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
index b97647e0b3..e0c9f6298f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.coprocessor;
 
 import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHYSICAL_DATA_TABLE_NAME;
-import static 
org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -43,9 +42,6 @@ import 
org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
 import org.apache.phoenix.hbase.index.parallel.Task;
 import org.apache.phoenix.hbase.index.parallel.TaskBatch;
 import org.apache.phoenix.hbase.index.parallel.TaskRunner;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
-import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
@@ -66,7 +62,6 @@ public class UncoveredGlobalIndexRegionScanner extends 
UncoveredIndexRegionScann
   private static final Logger LOGGER =
     LoggerFactory.getLogger(UncoveredGlobalIndexRegionScanner.class);
   public static final String NUM_CONCURRENT_INDEX_THREADS_CONF_KEY = 
"phoenix.index.threads.max";
-  public static final int DEFAULT_CONCURRENT_INDEX_THREADS = 16;
   public static final String INDEX_ROW_COUNTS_PER_TASK_CONF_KEY =
     "phoenix.index.row.count.per.task";
   public static final int DEFAULT_INDEX_ROW_COUNTS_PER_TASK = 2048;
@@ -74,7 +69,6 @@ public class UncoveredGlobalIndexRegionScanner extends 
UncoveredIndexRegionScann
   protected byte[][] regionEndKeys;
   protected final Table dataHTable;
   protected final int rowCountPerTask;
-  protected final TaskRunner pool;
   protected String exceptionMessage;
   protected final HTableFactory hTableFactory;
 
@@ -96,11 +90,6 @@ public class UncoveredGlobalIndexRegionScanner extends 
UncoveredIndexRegionScann
     hTableFactory = IndexWriterUtils.getDefaultDelegateHTableFactory(env);
     rowCountPerTask =
       config.getInt(INDEX_ROW_COUNTS_PER_TASK_CONF_KEY, 
DEFAULT_INDEX_ROW_COUNTS_PER_TASK);
-
-    pool = new WaitForCompletionTaskRunner(ThreadPoolManager
-      .getExecutor(new ThreadPoolBuilder("Uncovered Global Index", 
env.getConfiguration())
-        .setMaxThread(NUM_CONCURRENT_INDEX_THREADS_CONF_KEY, 
DEFAULT_CONCURRENT_INDEX_THREADS)
-        .setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
     byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
     dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
     regionEndKeys =
@@ -121,7 +110,6 @@ public class UncoveredGlobalIndexRegionScanner extends 
UncoveredIndexRegionScann
     if (dataHTable != null) {
       dataHTable.close();
     }
-    this.pool.stop("UncoveredGlobalIndexRegionScanner is closing");
   }
 
   protected void scanDataRows(Collection<byte[]> dataRowKeys, long startTime) 
throws IOException {
@@ -178,7 +166,12 @@ public class UncoveredGlobalIndexRegionScanner extends 
UncoveredIndexRegionScann
     Pair<List<Boolean>, List<Future<Boolean>>> resultsAndFutures = null;
     try {
       LOGGER.debug("Waiting on index tasks to complete...");
-      resultsAndFutures = this.pool.submitUninterruptible(tasks);
+      TaskRunner pool = 
PhoenixRegionServerEndpoint.getUncoveredIndexThreadPool();
+      if (pool == null) {
+        throw new IOException(
+          "PhoenixRegionServerEndpoint should be loaded to use Uncovered 
Indexes.");
+      }
+      resultsAndFutures = pool.submitUninterruptible(tasks);
     } catch (ExecutionException e) {
       throw new RuntimeException(
         "Should not fail on the results while using a 
WaitForCompletionTaskRunner", e);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolManager.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolManager.java
index d79c50c394..4400339101 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolManager.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolManager.java
@@ -56,6 +56,16 @@ public class ThreadPoolManager {
     return getExecutor(builder, env.getSharedData());
   }
 
+  /**
+   * Used by PhoenixRegionServerEndpoint to create a regionserver level pool 
for Uncovered Indexes.
+   * Since this pool will be tied to the regionserver's lifecycle, we don't 
need to use sharedData.
+   */
+  public static synchronized ThreadPoolExecutor getExecutor(ThreadPoolBuilder 
builder) {
+    ShutdownOnUnusedThreadPoolExecutor pool = getDefaultExecutor(builder);
+    pool.addReference();
+    return pool;
+  }
+
   static synchronized ThreadPoolExecutor getExecutor(ThreadPoolBuilder builder,
     Map<String, Object> poolCache) {
     ThreadPoolExecutor pool = (ThreadPoolExecutor) 
poolCache.get(builder.getName());
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
index 4cfdffd6a8..e0d3bc1b35 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
@@ -186,6 +186,7 @@ public abstract class BasePermissionsIT extends BaseTest {
     configureNamespacesOnServer(config, isNamespaceMapped);
     configureStatsConfigurations(config);
     config.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true);
+    BaseTest.setPhoenixRegionServerEndpoint(config);
 
     testUtil.startMiniCluster(1);
     superUser1 = User.createUserForTesting(config, SUPER_USER, new String[0]);

Reply via email to