This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 20e1f62e52 PHOENIX-7670 Region level threadpool for uncovered index to
scan data table (#2252)
20e1f62e52 is described below
commit 20e1f62e52df8de3f5ebdc940479aeae393c1def
Author: Palash Chauhan <[email protected]>
AuthorDate: Fri Aug 15 11:57:55 2025 -0700
PHOENIX-7670 Region level threadpool for uncovered index to scan data table
(#2252)
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../org/apache/phoenix/query/QueryServices.java | 4 +++
.../apache/phoenix/query/QueryServicesOptions.java | 3 ++
.../coprocessor/PhoenixRegionServerEndpoint.java | 32 ++++++++++++++++++++++
.../UncoveredGlobalIndexRegionScanner.java | 19 ++++---------
.../hbase/index/parallel/ThreadPoolManager.java | 10 +++++++
.../apache/phoenix/end2end/BasePermissionsIT.java | 1 +
6 files changed, 56 insertions(+), 13 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index e3f494897d..000f5a596a 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -627,6 +627,10 @@ public interface QueryServices extends SQLCloseable {
String PHOENIX_CDC_STREAM_PARTITION_EXPIRY_MIN_AGE_MS =
"phoenix.cdc.stream.partition.expiry.min.age.ms";
+ String PHOENIX_UNCOVERED_INDEX_MAX_POOL_SIZE =
"phoenix.uncovered.index.threads.max";
+ String PHOENIX_UNCOVERED_INDEX_KEEP_ALIVE_TIME_SEC =
+ "phoenix.uncovered.index.threads.keepalive.sec";
+
/**
* Get executor service used for parallel scans
*/
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index f6f44c23c1..8e4c7febf5 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -501,6 +501,9 @@ public class QueryServicesOptions {
public static final long
DEFAULT_PHOENIX_CDC_STREAM_PARTITION_EXPIRY_MIN_AGE_MS =
30 * 60 * 60 * 1000; // 30 hours
+ public static final int DEFAULT_PHOENIX_UNCOVERED_INDEX_MAX_POOL_SIZE = 512;
+ public static final int DEFAULT_PHOENIX_UNCOVERED_INDEX_KEEP_ALIVE_TIME_SEC
= 60; // 1min
+
private final Configuration config;
private QueryServicesOptions(Configuration config) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index fabc28e712..bdddf0b10b 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -31,8 +31,14 @@ import org.apache.phoenix.cache.ServerMetadataCacheImpl;
import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
import
org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
import
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import org.apache.phoenix.hbase.index.parallel.TaskRunner;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
@@ -47,11 +53,23 @@ public class PhoenixRegionServerEndpoint extends
private MetricsMetadataCachingSource metricsSource;
protected Configuration conf;
+ // regionserver level thread pool used by Uncovered Indexes to scan data
table rows
+ private static TaskRunner uncoveredIndexThreadPool;
+
@Override
public void start(CoprocessorEnvironment env) throws IOException {
this.conf = env.getConfiguration();
this.metricsSource =
MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
+ initUncoveredIndexThreadPool(this.conf);
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ if (uncoveredIndexThreadPool != null) {
+ uncoveredIndexThreadPool
+ .stop("PhoenixRegionServerEndpoint is stopping. Shutting down
uncovered index threadpool.");
+ }
}
@Override
@@ -133,4 +151,18 @@ public class PhoenixRegionServerEndpoint extends
return ServerMetadataCacheImpl.getInstance(conf);
}
+ public static TaskRunner getUncoveredIndexThreadPool() {
+ return uncoveredIndexThreadPool;
+ }
+
+ private static void initUncoveredIndexThreadPool(Configuration conf) {
+ uncoveredIndexThreadPool = new WaitForCompletionTaskRunner(
+ ThreadPoolManager.getExecutor(new ThreadPoolBuilder("Uncovered Global
Index", conf)
+ .setMaxThread(QueryServices.PHOENIX_UNCOVERED_INDEX_MAX_POOL_SIZE,
+ QueryServicesOptions.DEFAULT_PHOENIX_UNCOVERED_INDEX_MAX_POOL_SIZE)
+
.setCoreTimeout(QueryServices.PHOENIX_UNCOVERED_INDEX_KEEP_ALIVE_TIME_SEC,
+
QueryServicesOptions.DEFAULT_PHOENIX_UNCOVERED_INDEX_KEEP_ALIVE_TIME_SEC)));
+ LOGGER.info("Initialized region level thread pool for Uncovered Global
Indexes.");
+ }
+
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
index b97647e0b3..e0c9f6298f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
@@ -18,7 +18,6 @@
package org.apache.phoenix.coprocessor;
import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHYSICAL_DATA_TABLE_NAME;
-import static
org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
import java.io.IOException;
import java.util.Collection;
@@ -43,9 +42,6 @@ import
org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
import org.apache.phoenix.hbase.index.parallel.Task;
import org.apache.phoenix.hbase.index.parallel.TaskBatch;
import org.apache.phoenix.hbase.index.parallel.TaskRunner;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
-import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
@@ -66,7 +62,6 @@ public class UncoveredGlobalIndexRegionScanner extends
UncoveredIndexRegionScann
private static final Logger LOGGER =
LoggerFactory.getLogger(UncoveredGlobalIndexRegionScanner.class);
public static final String NUM_CONCURRENT_INDEX_THREADS_CONF_KEY =
"phoenix.index.threads.max";
- public static final int DEFAULT_CONCURRENT_INDEX_THREADS = 16;
public static final String INDEX_ROW_COUNTS_PER_TASK_CONF_KEY =
"phoenix.index.row.count.per.task";
public static final int DEFAULT_INDEX_ROW_COUNTS_PER_TASK = 2048;
@@ -74,7 +69,6 @@ public class UncoveredGlobalIndexRegionScanner extends
UncoveredIndexRegionScann
protected byte[][] regionEndKeys;
protected final Table dataHTable;
protected final int rowCountPerTask;
- protected final TaskRunner pool;
protected String exceptionMessage;
protected final HTableFactory hTableFactory;
@@ -96,11 +90,6 @@ public class UncoveredGlobalIndexRegionScanner extends
UncoveredIndexRegionScann
hTableFactory = IndexWriterUtils.getDefaultDelegateHTableFactory(env);
rowCountPerTask =
config.getInt(INDEX_ROW_COUNTS_PER_TASK_CONF_KEY,
DEFAULT_INDEX_ROW_COUNTS_PER_TASK);
-
- pool = new WaitForCompletionTaskRunner(ThreadPoolManager
- .getExecutor(new ThreadPoolBuilder("Uncovered Global Index",
env.getConfiguration())
- .setMaxThread(NUM_CONCURRENT_INDEX_THREADS_CONF_KEY,
DEFAULT_CONCURRENT_INDEX_THREADS)
- .setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
dataHTable = hTableFactory.getTable(new ImmutableBytesPtr(dataTableName));
regionEndKeys =
@@ -121,7 +110,6 @@ public class UncoveredGlobalIndexRegionScanner extends
UncoveredIndexRegionScann
if (dataHTable != null) {
dataHTable.close();
}
- this.pool.stop("UncoveredGlobalIndexRegionScanner is closing");
}
protected void scanDataRows(Collection<byte[]> dataRowKeys, long startTime)
throws IOException {
@@ -178,7 +166,12 @@ public class UncoveredGlobalIndexRegionScanner extends
UncoveredIndexRegionScann
Pair<List<Boolean>, List<Future<Boolean>>> resultsAndFutures = null;
try {
LOGGER.debug("Waiting on index tasks to complete...");
- resultsAndFutures = this.pool.submitUninterruptible(tasks);
+ TaskRunner pool =
PhoenixRegionServerEndpoint.getUncoveredIndexThreadPool();
+ if (pool == null) {
+ throw new IOException(
+ "PhoenixRegionServerEndpoint should be loaded to use Uncovered
Indexes.");
+ }
+ resultsAndFutures = pool.submitUninterruptible(tasks);
} catch (ExecutionException e) {
throw new RuntimeException(
"Should not fail on the results while using a
WaitForCompletionTaskRunner", e);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolManager.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolManager.java
index d79c50c394..4400339101 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolManager.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/parallel/ThreadPoolManager.java
@@ -56,6 +56,16 @@ public class ThreadPoolManager {
return getExecutor(builder, env.getSharedData());
}
+ /**
+ * Used by PhoenixRegionServerEndpoint to create a regionserver level pool
for Uncovered Indexes.
+ * Since this pool will be tied to the regionserver's lifecycle, we don't
need to use sharedData.
+ */
+ public static synchronized ThreadPoolExecutor getExecutor(ThreadPoolBuilder
builder) {
+ ShutdownOnUnusedThreadPoolExecutor pool = getDefaultExecutor(builder);
+ pool.addReference();
+ return pool;
+ }
+
static synchronized ThreadPoolExecutor getExecutor(ThreadPoolBuilder builder,
Map<String, Object> poolCache) {
ThreadPoolExecutor pool = (ThreadPoolExecutor)
poolCache.get(builder.getName());
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
index 4cfdffd6a8..e0d3bc1b35 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
@@ -186,6 +186,7 @@ public abstract class BasePermissionsIT extends BaseTest {
configureNamespacesOnServer(config, isNamespaceMapped);
configureStatsConfigurations(config);
config.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true);
+ BaseTest.setPhoenixRegionServerEndpoint(config);
testUtil.startMiniCluster(1);
superUser1 = User.createUserForTesting(config, SUPER_USER, new String[0]);