This is an automated email from the ASF dual-hosted git repository.
jisaac pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 20e3bded8a PHOENIX-7605 Adding ability to configure threadpool at CQSI
level (#2138)
20e3bded8a is described below
commit 20e3bded8adabbb501d41a7d7cb8d7782e7cba0e
Author: ritegarg <[email protected]>
AuthorDate: Wed May 14 10:46:35 2025 -0700
PHOENIX-7605 Adding ability to configure threadpool at CQSI level (#2138)
Co-authored-by: Ritesh Garg
<[email protected]>
---
.../phoenix/query/ConnectionQueryServicesImpl.java | 118 +++++++--
.../apache/phoenix/query/HConnectionFactory.java | 19 ++
.../org/apache/phoenix/query/HTableFactory.java | 8 +-
.../org/apache/phoenix/query/QueryServices.java | 10 +
.../apache/phoenix/query/QueryServicesOptions.java | 26 +-
.../phoenix/jdbc/FailoverPhoenixConnectionIT.java | 31 +++
.../phoenix/jdbc/ParallelPhoenixConnectionIT.java | 54 ++++
.../ConnectionQueryServicesImplThreadPoolIT.java | 285 +++++++++++++++++++++
.../java/org/apache/phoenix/query/BaseTest.java | 8 +
.../query/ConnectionQueryServicesImplTest.java | 53 +++-
10 files changed, 583 insertions(+), 29 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 762eccf054..a5998750dd 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -81,11 +81,17 @@ import static
org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_VIEW_TTL_ENABLED;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_ENABLED;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_MAX_THREADS;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_QUERY_SERVICES_NAME;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
-import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_TIMEOUT_DURING_UPGRADE_MS;
import static org.apache.phoenix.util.UpgradeUtil.addParentToChildLinks;
import static org.apache.phoenix.util.UpgradeUtil.addViewIndexToParentLinks;
@@ -118,6 +124,7 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -132,6 +139,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -185,6 +193,7 @@ import
org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.ipc.RemoteException;
@@ -417,6 +426,8 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
private Connection invalidateMetadataCacheConnection = null;
private final Object invalidateMetadataCacheConnLock = new Object();
private MetricsMetadataCachingSource metricsMetadataCachingSource;
+ private ThreadPoolExecutor threadPoolExecutor = null;
+ private static final AtomicInteger threadPoolNumber = new AtomicInteger(1);
public static final String INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE =
"Cannot invalidate server metadata cache on a non-server
connection";
@@ -451,9 +462,10 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
/**
* Construct a ConnectionQueryServicesImpl that represents a connection to
an HBase
* cluster.
- * @param services base services from where we derive our default
configuration
+ *
+ * @param services base services from where we derive our default
configuration
* @param connectionInfo to provide connection information
- * @param info hbase configuration properties
+ * @param info hbase configuration properties
*/
public ConnectionQueryServicesImpl(QueryServices services, ConnectionInfo
connectionInfo, Properties info) {
super(services);
@@ -477,23 +489,73 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
this.connectionInfo = connectionInfo;
// Without making a copy of the configuration we cons up, we lose some
of our properties
- // on the server side during testing.
- this.config =
HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
+ // on the server side during testing. This allows the application
overridden
+ // ConfigurationFactory to inject/modify configs
+ Configuration finalConfig = HBaseFactoryProvider
+
.getConfigurationFactory().getConfiguration(config);
+ this.config = finalConfig;
+
+ if (finalConfig.getBoolean(CQSI_THREAD_POOL_ENABLED,
DEFAULT_CQSI_THREAD_POOL_ENABLED)) {
+ final int keepAlive =
finalConfig.getInt(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS,
+ DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS);
+ final int corePoolSize =
finalConfig.getInt(CQSI_THREAD_POOL_CORE_POOL_SIZE,
+ DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE);
+ final int maxThreads =
finalConfig.getInt(CQSI_THREAD_POOL_MAX_THREADS,
+ DEFAULT_CQSI_THREAD_POOL_MAX_THREADS);
+ final int maxQueue = finalConfig.getInt(CQSI_THREAD_POOL_MAX_QUEUE,
+ DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE);
+ final String threadPoolName = connectionInfo.getPrincipal() != null
+ ? connectionInfo.getPrincipal()
+ : DEFAULT_QUERY_SERVICES_NAME;
+ // Based on implementations used in
+ // org.apache.hadoop.hbase.client.ConnectionImplementation
+ final BlockingQueue<Runnable> workQueue = new
LinkedBlockingQueue<>(maxQueue);
+ this.threadPoolExecutor =
+ new ThreadPoolExecutor(corePoolSize, maxThreads,
keepAlive, TimeUnit.SECONDS,
+ workQueue, new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("CQSI-" + threadPoolName
+ + "-" +
threadPoolNumber.incrementAndGet()
+ + "-shared-pool-%d")
+ .setUncaughtExceptionHandler(
+
Threads.LOGGING_EXCEPTION_HANDLER)
+ .build());
+ this.threadPoolExecutor.allowCoreThreadTimeOut(finalConfig
+ .getBoolean(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+ DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT));
+ LOGGER.info("For ConnectionQueryService = {} , " +
+ "CQSI ThreadPool Configs {} = {}, {} = {}, {} =
{}, {} = {}, {} = {}",
+ threadPoolName,
+ CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS,
+ finalConfig.get(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS),
+ CQSI_THREAD_POOL_CORE_POOL_SIZE,
+ finalConfig.get(CQSI_THREAD_POOL_CORE_POOL_SIZE),
+ CQSI_THREAD_POOL_MAX_THREADS,
+ finalConfig.get(CQSI_THREAD_POOL_MAX_THREADS),
+ CQSI_THREAD_POOL_MAX_QUEUE,
+ finalConfig.get(CQSI_THREAD_POOL_MAX_QUEUE),
+ CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+
finalConfig.get(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT));
+ }
+
+
LOGGER.info(
- "CQS Configs {} = {} , {} = {} , {} = {} , {} = {} , {} = {} ,
{} = {} , {} = {}",
+ "CQS Configs {} = {} , {} = {} , {} = {} , {} = {} , {} = {} ,
{} = {} , {} = {}, {} = {}",
HConstants.ZOOKEEPER_QUORUM,
- this.config.get(HConstants.ZOOKEEPER_QUORUM),
HConstants.CLIENT_ZOOKEEPER_QUORUM,
- this.config.get(HConstants.CLIENT_ZOOKEEPER_QUORUM),
+ finalConfig.get(HConstants.ZOOKEEPER_QUORUM),
HConstants.CLIENT_ZOOKEEPER_QUORUM,
+ finalConfig.get(HConstants.CLIENT_ZOOKEEPER_QUORUM),
HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT,
- this.config.get(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT),
+ finalConfig.get(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT),
HConstants.ZOOKEEPER_CLIENT_PORT,
- this.config.get(HConstants.ZOOKEEPER_CLIENT_PORT),
+ finalConfig.get(HConstants.ZOOKEEPER_CLIENT_PORT),
RPCConnectionInfo.BOOTSTRAP_NODES,
- this.config.get(RPCConnectionInfo.BOOTSTRAP_NODES),
- HConstants.MASTER_ADDRS_KEY,
this.config.get(HConstants.MASTER_ADDRS_KEY),
+ finalConfig.get(RPCConnectionInfo.BOOTSTRAP_NODES),
+ HConstants.MASTER_ADDRS_KEY,
finalConfig.get(HConstants.MASTER_ADDRS_KEY),
ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
-
this.config.get(ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY));
+
finalConfig.get(ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY),
+ QueryServices.CQSI_THREAD_POOL_ENABLED,
+ finalConfig.get(QueryServices.CQSI_THREAD_POOL_ENABLED));
//Set the rpcControllerFactory if it is a server side connnection.
boolean isServerSideConnection =
config.getBoolean(QueryUtil.IS_SERVER_CONNECTION, false);
@@ -501,8 +563,8 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
this.serverSideRPCControllerFactory = new
ServerSideRPCControllerFactory(config);
}
// set replication required parameter
- ConfigUtil.setReplicationConfigIfAbsent(this.config);
- this.props = new ReadOnlyProps(this.config.iterator());
+ ConfigUtil.setReplicationConfigIfAbsent(finalConfig);
+ this.props = new ReadOnlyProps(finalConfig.iterator());
this.userName = connectionInfo.getPrincipal();
this.user = connectionInfo.getUser();
this.latestMetaData = newEmptyMetaData();
@@ -557,17 +619,17 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
.build();
}
- if (this.config.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
+ if (finalConfig.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
// "hbase.client.metrics.scope" defined on
//
org.apache.hadoop.hbase.client.MetricsConnection#METRICS_SCOPE_KEY
// however we cannot use the constant directly as long as we
support HBase 2.4 profile.
- this.config.set("hbase.client.metrics.scope",
config.get(QUERY_SERVICES_NAME));
+ finalConfig.set("hbase.client.metrics.scope",
config.get(QUERY_SERVICES_NAME));
}
if (!QueryUtil.isServerConnection(props)) {
//Start queryDistruptor everytime as log level can be change at
connection level as well, but we can avoid starting for server connections.
try {
- this.queryDisruptor = new QueryLoggerDisruptor(this.config);
+ this.queryDisruptor = new QueryLoggerDisruptor(finalConfig);
} catch (SQLException e) {
LOGGER.warn("Unable to initiate query logging service !!");
e.printStackTrace();
@@ -582,7 +644,8 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
private Connection openConnection(Configuration conf) throws SQLException {
Connection localConnection;
try {
- localConnection =
HBaseFactoryProvider.getHConnectionFactory().createConnection(conf);
+ localConnection = HBaseFactoryProvider.getHConnectionFactory()
+ .createConnection(conf, threadPoolExecutor);
GLOBAL_HCONNECTIONS_COUNTER.increment();
LOGGER.info("HConnection established. Stacktrace for informational
purposes: "
+ localConnection + " " + LogUtil.getCallerStackTrace());
@@ -634,7 +697,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
public Table getTable(byte[] tableName) throws SQLException {
try {
return HBaseFactoryProvider.getHTableFactory().getTable(tableName,
- connection, null);
+ connection, threadPoolExecutor);
} catch (IOException e) {
throw new SQLException(e);
}
@@ -760,6 +823,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
try {
tableStatsCache.invalidateAll();
super.close();
+ shutdownThreadPool(this.threadPoolExecutor);
} catch (SQLException e) {
if (sqlE == null) {
sqlE = e;
@@ -774,6 +838,20 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
}
}
+ // Based on org.apache.hadoop.hbase.client.ConnectionImplementation
+ private void shutdownThreadPool(ThreadPoolExecutor pool) {
+ if (pool != null && !pool.isShutdown()) {
+ pool.shutdown();
+ try {
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ pool.shutdownNow();
+ }
+ }
+ }
+
protected ConnectionQueryServices newChildQueryService() {
return new ChildQueryServices(this);
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/HConnectionFactory.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/HConnectionFactory.java
index c3e0eb54dc..b296b35892 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/HConnectionFactory.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/HConnectionFactory.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.query;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
@@ -38,6 +39,18 @@ public interface HConnectionFactory {
*/
Connection createConnection(Configuration conf) throws IOException;
+ /**
+ * Creates HConnection to access HBase clusters.
+ *
+ * @param conf object
+ * @param pool object
+ * @return A HConnection instance
+ */
+ default Connection createConnection(Configuration conf, ExecutorService
pool)
+ throws IOException {
+ return createConnection(conf);
+ }
+
/**
* Default implementation. Uses standard HBase HConnections.
*/
@@ -46,5 +59,11 @@ public interface HConnectionFactory {
public Connection createConnection(Configuration conf) throws
IOException {
return ConnectionFactory.createConnection(conf);
}
+
+ @Override
+ public Connection createConnection(Configuration conf, ExecutorService
pool)
+ throws IOException {
+ return ConnectionFactory.createConnection(conf, pool);
+ }
}
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/HTableFactory.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/HTableFactory.java
index 10a531f198..79607df8e7 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/HTableFactory.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/HTableFactory.java
@@ -47,9 +47,11 @@ public interface HTableFactory {
*/
static class HTableFactoryImpl implements HTableFactory {
@Override
- public Table getTable(byte[] tableName, Connection connection,
ExecutorService pool) throws IOException {
- // Let the HBase client manage the thread pool instead of passing
ours through
- return connection.getTable(TableName.valueOf(tableName));
+ public Table getTable(byte[] tableName, Connection connection,
ExecutorService pool)
+ throws IOException {
+ // If CQSI_THREAD_POOL_ENABLED then we pass ExecutorService
created in CQSI to
+ // HBase Client, else it is null(default), let the HBase client
manage the thread pool
+ return connection.getTable(TableName.valueOf(tableName), pool);
}
}
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index bc9320723c..772eac78c5 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -384,6 +384,16 @@ public interface QueryServices extends SQLCloseable {
public static final String PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT =
"phoenix.view.ttl.tenant_views_per_scan.limit";
// Block mutations based on cluster role record
public static final String CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED =
"phoenix.cluster.role.based.mutation.block.enabled";
+ //Enable Thread Pool Creation in CQSI to be used for HBase Client.
+ String CQSI_THREAD_POOL_ENABLED = "phoenix.cqsi.thread.pool.enabled";
+ //CQSI Thread Pool Related Configuration.
+ String CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS =
"phoenix.cqsi.thread.pool.keepalive.seconds";
+ String CQSI_THREAD_POOL_CORE_POOL_SIZE =
"phoenix.cqsi.thread.pool.core.size";
+ String CQSI_THREAD_POOL_MAX_THREADS =
"phoenix.cqsi.thread.pool.max.threads";
+ String CQSI_THREAD_POOL_MAX_QUEUE = "phoenix.cqsi.thread.pool.max.queue";
+ // Enables
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html#allowCoreThreadTimeOut-boolean-
+ String CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT
+ = "phoenix.cqsi.thread.pool.allow.core.thread.timeout";
// Before 4.15 when we created a view we included the parent table column
metadata in the view
// metadata. After PHOENIX-3534 we allow SYSTEM.CATALOG to split and no
longer store the parent
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3bd94e4ffa..ca637dbf90 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -75,6 +75,12 @@ import static
org.apache.phoenix.query.QueryServices.METRIC_PUBLISHER_ENABLED;
import static
org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
+import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
import static org.apache.phoenix.query.QueryServices.PHOENIX_ACLS_ENABLED;
import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB;
@@ -454,6 +460,12 @@ public class QueryServicesOptions {
public static final Boolean
DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false;
+ public static final Boolean DEFAULT_CQSI_THREAD_POOL_ENABLED = false;
+ public static final int DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = 60;
+ public static final int DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE = 25;
+ public static final int DEFAULT_CQSI_THREAD_POOL_MAX_THREADS = 25;
+ public static final int DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE = 512;
+ public static final Boolean
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true;
private final Configuration config;
@@ -558,8 +570,18 @@ public class QueryServicesOptions {
DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX)
.setIfUnset(MAX_IN_LIST_SKIP_SCAN_SIZE,
DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE)
.setIfUnset(CONNECTION_ACTIVITY_LOGGING_ENABLED,
DEFAULT_CONNECTION_ACTIVITY_LOGGING_ENABLED)
- .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL,
DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS)
- .setIfUnset(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED,
DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED);
+ .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL,
+ DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS)
+ .setIfUnset(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED,
+ DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED)
+ .setIfUnset(CQSI_THREAD_POOL_ENABLED,
DEFAULT_CQSI_THREAD_POOL_ENABLED)
+ .setIfUnset(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS,
+ DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS)
+ .setIfUnset(CQSI_THREAD_POOL_CORE_POOL_SIZE,
DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE)
+ .setIfUnset(CQSI_THREAD_POOL_MAX_THREADS,
DEFAULT_CQSI_THREAD_POOL_MAX_THREADS)
+ .setIfUnset(CQSI_THREAD_POOL_MAX_QUEUE,
DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE)
+ .setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+ DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user
set
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
index ab26000567..73da203c6c 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
@@ -25,9 +25,17 @@ import static
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestin
import static
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
import static
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithStatement;
import static
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup;
+import static
org.apache.phoenix.query.BaseTest.extractThreadPoolExecutorFromCQSI;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE;
+import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
@@ -48,6 +56,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -113,6 +122,12 @@ public class FailoverPhoenixConnectionIT {
haGroupName = testName.getMethodName();
clientProperties =
HighAvailabilityTestingUtility.getHATestProperties();
clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);
+ clientProperties.setProperty(CQSI_THREAD_POOL_ENABLED,
String.valueOf(true));
+ clientProperties.setProperty(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS,
String.valueOf(13));
+ clientProperties.setProperty(CQSI_THREAD_POOL_CORE_POOL_SIZE,
String.valueOf(17));
+ clientProperties.setProperty(CQSI_THREAD_POOL_MAX_THREADS,
String.valueOf(19));
+ clientProperties.setProperty(CQSI_THREAD_POOL_MAX_QUEUE,
String.valueOf(23));
+
clientProperties.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
String.valueOf(true));
// Make first cluster ACTIVE
CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER);
@@ -138,6 +153,22 @@ public class FailoverPhoenixConnectionIT {
}
}
+ @Test
+ public void testCQSIThreadPoolCreation() throws Exception {
+ try (Connection conn = createFailoverConnection()) {
+ FailoverPhoenixConnection failoverConn =
conn.unwrap(FailoverPhoenixConnection.class);
+
+ // verify connection#1
+ ConnectionQueryServices cqsi =
PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl1(haGroup),
clientProperties);
+ ConnectionQueryServices cqsiFromConn =
failoverConn.getWrappedConnection().getQueryServices();
+ // Check that same ThreadPoolExecutor object is used for CQSIs
+ Assert.assertSame(extractThreadPoolExecutorFromCQSI(cqsi),
extractThreadPoolExecutorFromCQSI(cqsiFromConn));
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
/**
* Test Phoenix connection creation and basic operations with HBase
cluster pair.
*/
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
index aa2bf52a9a..7b98a50e3a 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
@@ -34,13 +34,22 @@ import static
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALL
import static
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_EXECUTION_TIME;
import static
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_QUEUE_WAIT_TIME;
import static
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_REJECTED_COUNTER;
+import static
org.apache.phoenix.query.BaseTest.extractThreadPoolExecutorFromCQSI;
import static org.apache.phoenix.query.QueryServices.AUTO_COMMIT_ATTRIB;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE;
+import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
+import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -49,8 +58,10 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.test.GenericTestUtils;
@@ -62,6 +73,8 @@ import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.monitoring.GlobalMetric;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.HBaseFactoryProvider;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.JDBCUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -107,6 +120,13 @@ public class ParallelPhoenixConnectionIT {
GLOBAL_PROPERTIES.setProperty(AUTO_COMMIT_ATTRIB, "true");
GLOBAL_PROPERTIES.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS,
String.valueOf(true));
GLOBAL_PROPERTIES.setProperty(QueryServices.LOG_LEVEL,
LogLevel.DEBUG.name()); //Need logging for query metrics
+ GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ENABLED,
String.valueOf(true));
+ GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS,
String.valueOf(13));
+ GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_CORE_POOL_SIZE,
String.valueOf(17));
+ GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_THREADS,
String.valueOf(19));
+ GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_QUEUE,
String.valueOf(23));
+
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
String.valueOf(true));
+
}
@AfterClass
@@ -172,6 +192,40 @@ public class ParallelPhoenixConnectionIT {
}
}
+ @Test
+ public void testDifferentCQSIThreadPoolsForParallelConnection() throws
Exception {
+ try (Connection conn = getParallelConnection()) {
+ ParallelPhoenixConnection pr =
conn.unwrap(ParallelPhoenixConnection.class);
+ PhoenixConnection pConn;
+ PhoenixConnection pConn2;
+ if
(CLUSTERS.getJdbcUrl1(haGroup).equals(pr.getFutureConnection1().get().getURL()))
{
+ pConn = pr.getFutureConnection1().get();
+ pConn2 = pr.getFutureConnection2().get();
+ } else {
+ pConn = pr.getFutureConnection2().get();
+ pConn2 = pr.getFutureConnection1().get();
+ }
+
+ // verify connection#1
+ ConnectionQueryServices cqsi =
PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl1(haGroup),
clientProperties);
+ ConnectionQueryServices cqsiFromConn = pConn.getQueryServices();
+ // Check that same ThreadPoolExecutor object is used for CQSIs
+ ThreadPoolExecutor threadPoolExecutor1 =
extractThreadPoolExecutorFromCQSI(cqsi);
+ Assert.assertSame(threadPoolExecutor1,
extractThreadPoolExecutorFromCQSI(cqsiFromConn));
+
+ // verify connection#2
+ cqsi =
PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl2(haGroup),
clientProperties);
+ cqsiFromConn = pConn2.getQueryServices();
+ Assert.assertSame(cqsi, cqsiFromConn);
+ // Check that same ThreadPoolExecutor object is used for CQSIs
+ ThreadPoolExecutor threadPoolExecutor2 =
extractThreadPoolExecutorFromCQSI(cqsi);
+ Assert.assertSame(extractThreadPoolExecutorFromCQSI(cqsi),
extractThreadPoolExecutorFromCQSI(cqsiFromConn));
+
+ // Check that both threadPools for parallel connections are
different.
+ assertNotSame(threadPoolExecutor1, threadPoolExecutor2);
+ }
+ }
+
/**
* Test Phoenix connection creation and basic operations with HBase
cluster(s) unavailable.
*/
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionQueryServicesImplThreadPoolIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionQueryServicesImplThreadPoolIT.java
new file mode 100644
index 0000000000..f2edfcb9cd
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionQueryServicesImplThreadPoolIT.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.ConnectionImplementation;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
+import org.apache.phoenix.jdbc.ConnectionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE;
+import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConnectionQueryServicesImplThreadPoolIT extends BaseTest {
+
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(ConnectionQueryServicesImplThreadPoolIT.class);
+ private AtomicInteger counter = new AtomicInteger();
+ private static HBaseTestingUtility hbaseTestUtil;
+ private String tableName;
+ private static final String CONN_QUERY_SERVICE_CREATE_TABLE =
"CONN_QUERY_SERVICE_CREATE_TABLE";
+ private static final String CONN_QUERY_SERVICE_1 = "CONN_QUERY_SERVICE_1";
+ private static final String CONN_QUERY_SERVICE_2 = "CONN_QUERY_SERVICE_2";
+ private static final int TEST_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = 13;
+ private static final int TEST_CQSI_THREAD_POOL_CORE_POOL_SIZE = 17;
+ private static final int TEST_CQSI_THREAD_POOL_MAX_THREADS = 19;
+ private static final int TEST_CQSI_THREAD_POOL_MAX_QUEUE = 23;
+
+
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ InstanceResolver.clearSingletons();
+ InstanceResolver.getSingleton(ConfigurationFactory.class, new
ConfigurationFactory() {
+ @Override public Configuration getConfiguration() {
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(CQSI_THREAD_POOL_ENABLED, Boolean.toString(true));
+ conf.set(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS,
Integer.toString(TEST_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS));
+ conf.set(CQSI_THREAD_POOL_CORE_POOL_SIZE,
Integer.toString(TEST_CQSI_THREAD_POOL_CORE_POOL_SIZE));
+ conf.set(CQSI_THREAD_POOL_MAX_THREADS,
Integer.toString(TEST_CQSI_THREAD_POOL_MAX_THREADS));
+ conf.set(CQSI_THREAD_POOL_MAX_QUEUE,
Integer.toString(TEST_CQSI_THREAD_POOL_MAX_QUEUE));
+ conf.set(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
Boolean.toString(true));
+ return conf;
+ }
+
+ @Override public Configuration getConfiguration(Configuration
confToClone) {
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(CQSI_THREAD_POOL_ENABLED, Boolean.toString(true));
+ conf.set(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS,
Integer.toString(TEST_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS));
+ conf.set(CQSI_THREAD_POOL_CORE_POOL_SIZE,
Integer.toString(TEST_CQSI_THREAD_POOL_CORE_POOL_SIZE));
+ conf.set(CQSI_THREAD_POOL_MAX_THREADS,
Integer.toString(TEST_CQSI_THREAD_POOL_MAX_THREADS));
+ conf.set(CQSI_THREAD_POOL_MAX_QUEUE,
Integer.toString(TEST_CQSI_THREAD_POOL_MAX_QUEUE));
+ conf.set(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
Boolean.toString(true));
+ Configuration copy = new Configuration(conf);
+ copy.addResource(confToClone);
+ return copy;
+ }
+ });
+ Configuration conf =
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ hbaseTestUtil = new HBaseTestingUtility(conf);
+ setUpConfigForMiniCluster(conf);
+ hbaseTestUtil.startMiniCluster();
+ String zkQuorum = "localhost:" +
hbaseTestUtil.getZkCluster().getClientPort();
+ url = PhoenixRuntime.JDBC_PROTOCOL +
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+ DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+ }
+
+ @AfterClass
+ public static void tearDownMiniCluster() {
+ try {
+ if (hbaseTestUtil != null) {
+ hbaseTestUtil.shutdownMiniCluster();
+ }
+ } catch (Exception e) {
+ // ignore
+ } finally {
+ ServerMetadataCacheTestImpl.resetCache();
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ tableName = generateUniqueName();
+ createTable(tableName);
+ }
+
+ private String connUrlWithPrincipal(String principalName) throws
SQLException {
+ return ConnectionInfo.create(url, null,
null).withPrincipal(principalName).toUrl();
+ }
+
+ @Test
+ public void checkHTableThreadPoolExecutorSame() throws Exception {
+ Table table = createCQSI(null).getTable(tableName.getBytes());
+ assertTrue(table instanceof HTable);
+ HTable hTable = (HTable) table;
+ Field props = hTable.getClass().getDeclaredField("pool");
+ props.setAccessible(true);
+ validateThreadPoolExecutor((ThreadPoolExecutor) props.get(hTable));
+ }
+
+ @Test
+ public void checkHConnectionThreadPoolExecutorSame() throws Exception {
+ // Extract Conn1 instance from CQSI1
+ ConnectionImplementation conn1 =
extractConnectionFromCQSI(createCQSI("hello"));
+ // Extract batchPool from connection in CQSI1
+ ThreadPoolExecutor threadPoolExecutor1FromConnection =
extractBatchPool(conn1);
+ // Create another CQSI2
+ ConnectionQueryServices connQueryServices2 = createCQSI("bye");
+ // Extract the ThreadPoolExecutor from CQSI2 instance
+ ThreadPoolExecutor threadPoolExecutor2 =
extractThreadPoolExecutorFromCQSI(connQueryServices2);
+ // Extract Conn2 from CQSI2
+ ConnectionImplementation conn2 =
extractConnectionFromCQSI(createCQSI("bye"));
+ // Extract batchPool from connection2 in CQSI2
+ ThreadPoolExecutor threadPoolExecutor2FromConnection =
extractBatchPool(conn2);
+ // Check if ThreadPoolExecutor2 from CQSI and from Connection are Same
+ assertSame(threadPoolExecutor2, threadPoolExecutor2FromConnection);
+ // Check if threadPoolExecutor from connection1 and from conn2 are
different
+ assertNotSame(threadPoolExecutor1FromConnection,
threadPoolExecutor2FromConnection);
+
+ // Validate the properties for ThreadPoolExecutors
+ validateThreadPoolExecutor(threadPoolExecutor1FromConnection);
+ validateThreadPoolExecutor(threadPoolExecutor2FromConnection);
+ validateThreadPoolExecutor(threadPoolExecutor2);
+ }
+
+ private static ThreadPoolExecutor
extractBatchPool(ConnectionImplementation conn) throws NoSuchFieldException,
IllegalAccessException {
+ Field batchPoolField = conn.getClass().getDeclaredField("batchPool");
+ batchPoolField.setAccessible(true);
+ return (ThreadPoolExecutor) batchPoolField.get(conn);
+ }
+
+ @Test
+ public void testMultipleCQSIThreadPoolsInParallel() throws Exception {
+ ConnectionQueryServices cqsiExternal1 =
createCQSI(CONN_QUERY_SERVICE_1);
+ ConnectionQueryServices cqsiExternal2 =
createCQSI(CONN_QUERY_SERVICE_2);
+ Thread cqsiThread1 = new Thread(() -> {
+ try {
+ ConnectionQueryServices cqsi =
createCQSI(CONN_QUERY_SERVICE_1);
+ checkSameThreadPool(cqsiExternal1, cqsi);
+ checkDifferentThreadPool(cqsiExternal2, cqsi);
+
validateThreadPoolExecutor(extractThreadPoolExecutorFromCQSI(cqsi));
+ counter.incrementAndGet();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ Thread cqsiThread2 = new Thread(() -> {
+ try {
+ ConnectionQueryServices cqsi =
createCQSI(CONN_QUERY_SERVICE_1);
+ checkSameThreadPool(cqsiExternal1, cqsi);
+ checkDifferentThreadPool(cqsiExternal2, cqsi);
+
validateThreadPoolExecutor(extractThreadPoolExecutorFromCQSI(cqsi));
+ counter.incrementAndGet();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ Thread cqsiThread3 = new Thread(() -> {
+ try {
+ ConnectionQueryServices cqsi =
createCQSI(CONN_QUERY_SERVICE_2);
+ checkSameThreadPool(cqsiExternal2, cqsi);
+ checkDifferentThreadPool(cqsiExternal1, cqsi);
+
validateThreadPoolExecutor(extractThreadPoolExecutorFromCQSI(cqsi));
+ counter.incrementAndGet();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ Thread cqsiThread4 = new Thread(() -> {
+ try {
+ ConnectionQueryServices cqsi =
createCQSI(CONN_QUERY_SERVICE_2);
+ checkSameThreadPool(cqsiExternal2, cqsi);
+ checkDifferentThreadPool(cqsiExternal1, cqsi);
+
validateThreadPoolExecutor(extractThreadPoolExecutorFromCQSI(cqsi));
+ counter.incrementAndGet();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+
+ cqsiThread1.start();
+ cqsiThread2.start();
+ cqsiThread3.start();
+ cqsiThread4.start();
+ cqsiThread1.join();
+ cqsiThread2.join();
+ cqsiThread3.join();
+ cqsiThread4.join();
+
+ assertEquals(4, counter.get());
+ }
+
+ private void checkSameThreadPool(ConnectionQueryServices cqsi1,
ConnectionQueryServices cqsi2) throws NoSuchFieldException,
IllegalAccessException {
+ assertSame(extractThreadPoolExecutorFromCQSI(cqsi1),
extractThreadPoolExecutorFromCQSI(cqsi2));
+ }
+
+ private void checkDifferentThreadPool(ConnectionQueryServices cqsi1,
ConnectionQueryServices cqsi2) throws NoSuchFieldException,
IllegalAccessException {
+ assertNotSame(extractThreadPoolExecutorFromCQSI(cqsi1),
extractThreadPoolExecutorFromCQSI(cqsi2));
+ }
+
+ private ConnectionQueryServices createCQSI(String serviceName) throws
SQLException {
+ String principalURL = connUrlWithPrincipal(serviceName);
+ Connection conn = DriverManager.getConnection(principalURL);
+ return conn.unwrap(PhoenixConnection.class).getQueryServices();
+ }
+
+ private void validateThreadPoolExecutor(ThreadPoolExecutor
threadPoolExecutor) {
+ assertEquals(TEST_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS,
threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS));
+ assertEquals(TEST_CQSI_THREAD_POOL_CORE_POOL_SIZE,
threadPoolExecutor.getCorePoolSize());
+ assertEquals(TEST_CQSI_THREAD_POOL_MAX_THREADS,
threadPoolExecutor.getMaximumPoolSize());
+ assertEquals(TEST_CQSI_THREAD_POOL_MAX_QUEUE,
threadPoolExecutor.getQueue().remainingCapacity());
+ }
+
+
+ private void createTable(String tableName) throws SQLException {
+ String CREATE_TABLE_DDL = "CREATE TABLE IF NOT EXISTS %s (K
VARCHAR(10) NOT NULL"
+ + " PRIMARY KEY, V VARCHAR)";
+ String princURL =
connUrlWithPrincipal(CONN_QUERY_SERVICE_CREATE_TABLE);
+ LOGGER.info("Connection Query Service : " +
CONN_QUERY_SERVICE_CREATE_TABLE + " URL : " + princURL);
+ try (Connection conn = DriverManager.getConnection(princURL);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(String.format(CREATE_TABLE_DDL, tableName));
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ private ConnectionImplementation
extractConnectionFromCQSI(ConnectionQueryServices cqsi) throws
NoSuchFieldException, IllegalAccessException {
+ Field connectionField1 =
cqsi.getClass().getDeclaredField("connection");
+ connectionField1.setAccessible(true);
+ return (ConnectionImplementation) connectionField1.get(cqsi);
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index a7fcfd2cb3..a888a8ac1a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -83,6 +83,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -110,6 +111,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -2188,4 +2190,10 @@ public abstract class BaseTest {
}
return false;
}
+
+ public static ThreadPoolExecutor extractThreadPoolExecutorFromCQSI(final
ConnectionQueryServices cqs) throws NoSuchFieldException,
IllegalAccessException {
+ Field props = cqs.getClass().getDeclaredField("threadPoolExecutor");
+ props.setAccessible(true);
+ return (ThreadPoolExecutor) props.get(cqs);
+ }
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
index 048bf250aa..f25e7a04ed 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
@@ -21,13 +21,21 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_FOR_MUTEX;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE;
+import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
@@ -42,6 +50,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -58,6 +70,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.phoenix.SystemExitRule;
import org.apache.phoenix.exception.PhoenixIOException;
+import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -134,6 +147,38 @@ public class ConnectionQueryServicesImplTest {
doCallRealMethod().when(mockCqs).dropTables(Mockito.any());
}
+ @Test
+ public void testCQSIThreadPoolCreation() throws SQLException,
NoSuchFieldException, IllegalAccessException {
+ QueryServices mockQueryServices = Mockito.mock(QueryServices.class);
+ ReadOnlyProps readOnlyProps = createCQSIThreadPoolReadOnlyProps();
+ when(mockQueryServices.getProps()).thenReturn(readOnlyProps);
+ ConnectionInfo mockConnectionInfo = Mockito.mock(ConnectionInfo.class);
+ when(mockConnectionInfo.asProps()).thenReturn(readOnlyProps);
+ Properties properties = new Properties();
+ ConnectionQueryServicesImpl cqs = new
ConnectionQueryServicesImpl(mockQueryServices, mockConnectionInfo, properties);
+ Field props = cqs.getClass().getDeclaredField("threadPoolExecutor");
+ props.setAccessible(true);
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)
props.get(cqs);
+ assertNotNull(threadPoolExecutor);
+ assertEquals(readOnlyProps.getInt(CQSI_THREAD_POOL_CORE_POOL_SIZE,
-1), threadPoolExecutor.getCorePoolSize());
+ assertEquals(readOnlyProps.getInt(CQSI_THREAD_POOL_MAX_THREADS,-1),
threadPoolExecutor.getMaximumPoolSize());
+ assertEquals(LinkedBlockingQueue.class,
threadPoolExecutor.getQueue().getClass());
+ assertEquals(readOnlyProps.getInt(CQSI_THREAD_POOL_MAX_QUEUE, -1),
threadPoolExecutor.getQueue().remainingCapacity());
+ assertEquals(readOnlyProps.getInt(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS,
-1), threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS));
+ assertTrue(threadPoolExecutor.allowsCoreThreadTimeOut());
+ }
+
+ private static ReadOnlyProps createCQSIThreadPoolReadOnlyProps() {
+ Map<String, String> props = new HashMap<>();
+ props.put(CQSI_THREAD_POOL_ENABLED, Boolean.toString(true));
+ props.put(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, Integer.toString(13));
+ props.put(CQSI_THREAD_POOL_CORE_POOL_SIZE, Integer.toString(17));
+ props.put(CQSI_THREAD_POOL_MAX_THREADS, Integer.toString(19));
+ props.put(CQSI_THREAD_POOL_MAX_QUEUE, Integer.toString(23));
+ props.put(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
Boolean.toString(true));
+ return new ReadOnlyProps(props);
+ }
+
@SuppressWarnings("unchecked")
@Test
public void testExceptionHandlingOnSystemNamespaceCreation() throws
Exception {
@@ -330,26 +375,26 @@ public class ConnectionQueryServicesImplTest {
public void testGetSysMutexTableWithName() throws Exception {
when(mockAdmin.tableExists(any())).thenReturn(true);
when(mockConn.getAdmin()).thenReturn(mockAdmin);
- when(mockConn.getTable(TableName.valueOf("SYSTEM.MUTEX")))
+ when(mockConn.getTable(eq(TableName.valueOf("SYSTEM.MUTEX")), any()))
.thenReturn(mockTable);
assertSame(mockCqs.getSysMutexTable(), mockTable);
verify(mockAdmin, Mockito.times(1)).tableExists(any());
verify(mockConn, Mockito.times(1)).getAdmin();
verify(mockConn, Mockito.times(1))
- .getTable(TableName.valueOf("SYSTEM.MUTEX"));
+ .getTable(eq(TableName.valueOf("SYSTEM.MUTEX")), any());
}
@Test
public void testGetSysMutexTableWithNamespace() throws Exception {
when(mockAdmin.tableExists(any())).thenReturn(false);
when(mockConn.getAdmin()).thenReturn(mockAdmin);
- when(mockConn.getTable(TableName.valueOf("SYSTEM:MUTEX")))
+ when(mockConn.getTable(eq(TableName.valueOf("SYSTEM:MUTEX")), any()))
.thenReturn(mockTable);
assertSame(mockCqs.getSysMutexTable(), mockTable);
verify(mockAdmin, Mockito.times(1)).tableExists(any());
verify(mockConn, Mockito.times(1)).getAdmin();
verify(mockConn, Mockito.times(1))
- .getTable(TableName.valueOf("SYSTEM:MUTEX"));
+ .getTable(eq(TableName.valueOf("SYSTEM:MUTEX")), any());
}
@Test