This is an automated email from the ASF dual-hosted git repository.
sanjeet pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new 928d086ff0 PHOENIX-7626: Add metrics to capture HTable thread pool
utilization and contention (#2201)
928d086ff0 is described below
commit 928d086ff0628eb48a4c84fb176b968654532755
Author: sanjeet006py <[email protected]>
AuthorDate: Sat Jun 28 08:24:51 2025 +0530
PHOENIX-7626: Add metrics to capture HTable thread pool utilization and
contention (#2201)
---
.../job/HTableThreadPoolWithUtilizationStats.java | 142 ++++++++++
.../monitoring/HTableThreadPoolHistograms.java | 176 ++++++++++++
.../monitoring/HTableThreadPoolMetricsManager.java | 158 +++++++++++
.../phoenix/monitoring/HistogramDistribution.java | 10 +
.../phoenix/monitoring/PercentileHistogram.java | 162 +++++++++++
.../PercentileHistogramDistribution.java | 101 +++++++
.../phoenix/monitoring/UtilizationHistogram.java | 62 +++++
.../phoenix/query/ConnectionQueryServicesImpl.java | 54 +++-
.../org/apache/phoenix/query/QueryServices.java | 4 +
.../apache/phoenix/query/QueryServicesOptions.java | 14 +-
.../org/apache/phoenix/util/PhoenixRuntime.java | 48 ++++
.../phoenix/jdbc/ParallelPhoenixConnectionIT.java | 119 +++++++-
.../monitoring/BaseHTableThreadPoolMetricsIT.java | 143 ++++++++++
.../monitoring/CQSIThreadPoolMetricsIT.java | 250 +++++++++++++++++
.../ExternalHTableThreadPoolMetricsIT.java | 299 +++++++++++++++++++++
.../org/apache/phoenix/jdbc/PhoenixTestDriver.java | 9 +-
.../java/org/apache/phoenix/query/BaseTest.java | 11 +-
17 files changed, 1746 insertions(+), 16 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/job/HTableThreadPoolWithUtilizationStats.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/job/HTableThreadPoolWithUtilizationStats.java
new file mode 100644
index 0000000000..fab9865a33
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/job/HTableThreadPoolWithUtilizationStats.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.job;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.monitoring.HTableThreadPoolHistograms;
+import org.apache.phoenix.monitoring.HTableThreadPoolMetricsManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+
+
+/**
+ * <b>External User-Facing API</b>
+ * <p>
+ * A specialized ThreadPoolExecutor designed specifically for capturing HTable
thread pool
+ * utilization statistics. This class extends the standard ThreadPoolExecutor
with built-in
+ * instrumentation to automatically collect key utilization metrics including
active thread count
+ * and queue size.
+ * </p>
+ * <h3>Purpose</h3>
+ * <p>
+ * Use this ThreadPoolExecutor when you need to monitor and analyze the
performance characteristics
+ * of HTable thread pool. The collected metrics help in understanding thread
pool behavior,
+ * identifying bottlenecks, and optimizing thread pool configurations.
+ * </p>
+ * <h3>Setup and Configuration</h3>
+ * <p>
+ * When instantiating this thread pool executor, you must provide an
idempotent supplier that
+ * returns an instance of {@link HTableThreadPoolHistograms}. This supplier
enables the collection
+ * of utilization statistics. Within the supplier, you can also attach custom
tags to the
+ * {@link HTableThreadPoolHistograms} instance for enhanced monitoring and
filtering capabilities.
+ * </p>
+ * <p>
+ * <b>Important:</b> If you pass a null supplier, metrics collection will be
completely disabled.
+ * This can be useful in scenarios where you want to use the thread pool
without the overhead of
+ * collecting utilization statistics.
+ * </p>
+ * <h3>Consuming Metrics</h3>
+ * <p>
+ * To retrieve the collected metrics as percentile distributions:
+ * </p>
+ * <ol>
+ * <li>Call {@link PhoenixRuntime#getHTableThreadPoolHistograms()}</li>
+ * <li>Use the htableThreadPoolHistogramsName as the key to retrieve the list
of
+ * {@link org.apache.phoenix.monitoring.PercentileHistogramDistribution}
instances</li>
+ * <li>Each metric type will have its own distribution instance in the
returned list</li>
+ * </ol>
+ * <p>
+ * Refer to the {@link
org.apache.phoenix.monitoring.PercentileHistogramDistribution} documentation
+ * to understand how to extract percentile values from the recorded data.
+ * </p>
+ * <h3>Usage Examples</h3>
+ * <p>
+ * For comprehensive usage examples and best practices, refer to the following
integration tests:
+ * </p>
+ * <ul>
+ * <li>CQSIThreadPoolMetricsIT</li>
+ * <li>ExternalHTableThreadPoolMetricsIT</li>
+ * </ul>
+ * @see HTableThreadPoolHistograms
+ * @see PhoenixRuntime#getHTableThreadPoolHistograms()
+ * @see org.apache.phoenix.monitoring.PercentileHistogramDistribution
+ */
+public class HTableThreadPoolWithUtilizationStats extends ThreadPoolExecutor {
+
+ private final String htableThreadPoolHistogramsName;
+ private final Supplier<HTableThreadPoolHistograms>
hTableThreadPoolHistogramsSupplier;
+
+ /**
+ * Creates a new HTable thread pool executor with built-in utilization
statistics collection.
+ * <p>
+ * This constructor accepts all the standard {@link ThreadPoolExecutor}
parameters plus
+ * additional parameters specific to HTable thread pool monitoring. The
thread pool will
+ * automatically collect utilization metrics (active thread count and
queue size) during task
+ * execution. To retrieve the collected metrics, use
+ * {@link PhoenixRuntime#getHTableThreadPoolHistograms()}.
+ * </p>
+ * @param htableThreadPoolHistogramsName Unique identifier for this thread
pool's metrics. This
+ * name serves as the key in the
metrics map returned by
+ * {@link
PhoenixRuntime#getHTableThreadPoolHistograms()}.
+ * Choose a descriptive name that
identifies the purpose
+ * of this thread pool.
+ * @param supplier Idempotent supplier that provides
the
+ * {@link
HTableThreadPoolHistograms} instance for metrics
+ * collection. This supplier will be
called only the first
+ * time a metric is recorded for the
given
+ * htableThreadPoolHistogramsName.
Subsequent metric
+ * recordings will reuse the same
histogram instance.
+ * <b>Pass null to disable metrics
collection
+ * entirely</b>, which eliminates
monitoring overhead but
+ * provides no utilization
statistics.
+ * @see ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
BlockingQueue,
+ * ThreadFactory)
+ * @see HTableThreadPoolHistograms
+ * @see PhoenixRuntime#getHTableThreadPoolHistograms()
+ */
+ public HTableThreadPoolWithUtilizationStats(int corePoolSize, int
maximumPoolSize,
+ long keepAliveTime, TimeUnit
unit,
+ BlockingQueue<Runnable>
workQueue,
+ ThreadFactory threadFactory,
+ String
htableThreadPoolHistogramsName,
+
Supplier<HTableThreadPoolHistograms> supplier) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ this.htableThreadPoolHistogramsName = htableThreadPoolHistogramsName;
+ this.hTableThreadPoolHistogramsSupplier = supplier;
+ }
+
+ public void execute(Runnable runnable) {
+ Preconditions.checkNotNull(runnable);
+ if (hTableThreadPoolHistogramsSupplier != null) {
+
HTableThreadPoolMetricsManager.updateActiveThreads(htableThreadPoolHistogramsName,
+ this.getActiveCount(), hTableThreadPoolHistogramsSupplier);
+ // Should we offset queue size by available threads if
CorePoolSize == MaxPoolSize?
+ // Tasks will first be put into thread pool's queue and will be
consumed by a worker
+ // thread waiting for tasks to arrive in queue. But while a task
is in queue, queue
+ // size > 0 though active no. of threads might be less than
MaxPoolSize.
+
HTableThreadPoolMetricsManager.updateQueueSize(htableThreadPoolHistogramsName,
+ this.getQueue().size(),
hTableThreadPoolHistogramsSupplier);
+ }
+ super.execute(runnable);
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolHistograms.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolHistograms.java
new file mode 100644
index 0000000000..4bdd8fd65a
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolHistograms.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.List;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
+/**
+ * A collection of histograms that monitor HTable thread pool performance
metrics including thread
+ * utilization and queue contention. This class tracks two key metrics to help
identify thread pool
+ * bottlenecks and performance issues. <br/>
+ * <br/>
+ * <b>External User-Facing Class:</b><br/>
+ * This is an external user-facing class that should only be used in
conjunction with
+ * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}. When
creating an instance of
+ * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}, a
+ * {@link java.util.function.Supplier} of this class must be provided. <br/>
+ * <br/>
+ * <b>Monitored Metrics:</b><br/>
+ * <ul>
+ * <li><b>Active Threads Count</b> - Number of threads currently executing
tasks</li>
+ * <li><b>Queue Size</b> - Number of tasks waiting in the thread pool
queue</li>
+ * </ul>
+ * <br/>
+ * Each metric is captured using a {@link UtilizationHistogram} that provides
percentile
+ * distributions, min/max values, and operation counts for comprehensive
analysis. <br/>
+ * <br/>
+ * <b>Tags:</b><br/>
+ * Supports metadata tags for dimensional monitoring:
+ * <ul>
+ * <li><b>servers</b> - Connection quorum string (ZK quorum, master quorum,
etc.)</li>
+ * <li><b>cqsiName</b> - Principal identifier for CQSI instance
differentiation</li>
+ * <li>Custom tags via {@link #addTag(String, String)}</li>
+ * </ul>
+ * The instance created by a supplier can also add tags to provide additional
context. <br/>
+ * <br/>
+ * <b>Instance Management:</b><br/>
+ * For instances created internally by the CQSI class: One instance per unique
connection info is
+ * created at the CQSI level, and multiple CQSI instances having the same
connection info will share
+ * the same instance of this class. <br/>
+ * For instances created externally by users: This instance management detail
does not apply, and
+ * users have full control over instance creation. <br/>
+ * <br/>
+ * Use {@link #getThreadPoolHistogramsDistribution()} to retrieve immutable
snapshots of the
+ * collected metrics for monitoring and analysis.
+ */
+public class HTableThreadPoolHistograms {
+ /**
+ * Predefined tag keys for dimensional monitoring and contextual
categorization of histogram
+ * instances. These tags provide context about the connection and CQSI
instance associated with
+ * the metrics.
+ */
+ public enum Tag {
+ servers,
+ cqsiName,
+ }
+
+ /**
+ * Enum that captures the name of each of the monitored metrics. These
names correspond to the
+ * specific thread pool performance metrics being tracked.
+ */
+ public enum HistogramName {
+ ActiveThreadsCount,
+ QueueSize,
+ }
+
+ private final UtilizationHistogram activeThreadsHisto;
+ private final UtilizationHistogram queuedSizeHisto;
+
+ /**
+ * Creates a new instance of HTableThreadPoolHistograms with the specified
maximum values for
+ * thread pool and queue size.
+ * @param maxThreadPoolSize the maximum number of threads in the thread
pool, used to configure
+ * the active threads histogram
+ * @param maxQueueSize the maximum size of the thread pool queue,
used to configure the
+ * queue size histogram
+ */
+ public HTableThreadPoolHistograms(long maxThreadPoolSize, long
maxQueueSize) {
+ activeThreadsHisto = new UtilizationHistogram(maxThreadPoolSize,
+ HistogramName.ActiveThreadsCount.name());
+ queuedSizeHisto = new UtilizationHistogram(maxQueueSize,
HistogramName.QueueSize.name());
+ }
+
+ /**
+ * Updates the histogram that tracks active threads count with the current
number of active
+ * threads. <br/>
+ * <br/>
+ * This method is to be called from HTableThreadPoolUtilizationStats class
only and should not
+ * be used from outside Phoenix.
+ * @param activeThreads the current number of threads actively executing
tasks
+ */
+ public void updateActiveThreads(long activeThreads) {
+ activeThreadsHisto.addValue(activeThreads);
+ }
+
+ /**
+ * Updates the histogram that tracks queue size with the current number of
queued tasks. <br/>
+ * <br/>
+ * This method is to be called from HTableThreadPoolUtilizationStats class
only and should not
+ * be used from outside Phoenix.
+ * @param queuedSize the current number of tasks waiting in the thread
pool queue
+ */
+ public void updateQueuedSize(long queuedSize) {
+ queuedSizeHisto.addValue(queuedSize);
+ }
+
+ /**
+ * Adds a server tag for dimensional monitoring that identifies the
connection quorum string
+ * such as ZK quorum, master quorum, etc. This corresponds to the
"servers" tag key. <br/>
+ * <br/>
+ * This is an external user-facing method which can be called when
creating an instance of the
+ * class.
+ * @param value the connection quorum string value
+ */
+ public void addServerTag(String value) {
+ addTag(Tag.servers.name(), value);
+ }
+
+ /**
+ * Adds a CQSI name tag that captures the principal of the CQSI instance.
This corresponds to
+ * the "cqsiName" tag key. <br/>
+ * <br/>
+ * This is an external user-facing method which can be called while
creating instance of the
+ * class.
+ * @param value the principal identifier for the CQSI instance
+ */
+ public void addCqsiNameTag(String value) {
+ addTag(Tag.cqsiName.name(), value);
+ }
+
+ /**
+ * Adds a custom tag with the specified key-value pair for dimensional
monitoring. This method
+ * allows adding arbitrary tags beyond the predefined servers and CQSI
name tags. <br/>
+ * <br/>
+ * This is an external user-facing method which can be called while
creating instance of the
+ * class.
+ * @param key the tag key
+ * @param value the tag value
+ */
+ public void addTag(String key, String value) {
+ activeThreadsHisto.addTag(key, value);
+ queuedSizeHisto.addTag(key, value);
+ }
+
+ /**
+ * Returns a list of HistogramDistribution which are immutable snapshots
containing percentile
+ * distribution, min/max values, and count of values for the monitored
metrics (active threads
+ * count and queue size). <br/>
+ * <br/>
+ * This method is to be called from
+ * {@link HTableThreadPoolMetricsManager#getHistogramsForAllThreadPools()}
only and should not
+ * be used from outside Phoenix.
+ * @return list of HistogramDistribution instances representing
comprehensive snapshots of the
+ * metrics
+ */
+ public List<HistogramDistribution> getThreadPoolHistogramsDistribution() {
+ return
ImmutableList.of(activeThreadsHisto.getPercentileHistogramDistribution(),
+ queuedSizeHisto.getPercentileHistogramDistribution());
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolMetricsManager.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolMetricsManager.java
new file mode 100644
index 0000000000..52843ece87
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolMetricsManager.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central registry and manager for HTable thread pool utilization and
contention metrics.
+ * <p>
+ * <b>Internal Use Only:</b> This class is designed for internal use only and
should not be used
+ * directly from outside Phoenix. External consumers should access thread pool
metrics through
+ * {@link
org.apache.phoenix.util.PhoenixRuntime#getHTableThreadPoolHistograms()}.
+ * </p>
+ * <p>
+ * This class serves as a singleton registry that maintains {@link
HTableThreadPoolHistograms}
+ * instances across the entire application lifecycle. Each {@link
HTableThreadPoolHistograms}
+ * instance contains utilization metrics (active thread count and queue size
histograms) for a
+ * specific HTable thread pool identified by a unique histogram key.
+ * </p>
+ * <h3>Storage and Instance Management</h3>
+ * <p>
+ * The manager stores {@link HTableThreadPoolHistograms} instances in a
thread-safe
+ * {@link ConcurrentHashMap} with the following characteristics:
+ * </p>
+ * <ul>
+ * <li><b>Key-based storage:</b> Each histogram instance is identified by a
unique string key</li>
+ * <li><b>Lazy initialization:</b> Instances are created on-demand when first
accessed</li>
+ * <li><b>Shared instances:</b> Multiple threads/components can share the same
histogram instance
+ * using the same key</li>
+ * <li><b>Thread-safe operations:</b> All storage operations are atomic and
thread-safe</li>
+ * </ul>
+ * <h3>Integration with HTable Thread Pool Utilization</h3>
+ * <p>
+ * This manager is exclusively used by
+ * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}, a
specialized
+ * ThreadPoolExecutor that automatically captures HTable thread pool
utilization statistics. The
+ * integration workflow is:
+ * </p>
+ * <ol>
+ * <li>The thread pool calls {@link #updateActiveThreads} and {@link
#updateQueueSize} to record
+ * metrics</li>
+ * <li>Metrics are stored in {@link HTableThreadPoolHistograms} instances
managed by this class</li>
+ * <li>External consumers access immutable metric snapshots through
+ * {@link
org.apache.phoenix.util.PhoenixRuntime#getHTableThreadPoolHistograms()}, which
internally
+ * calls {@link #getHistogramsForAllThreadPools()} to return {@link
HistogramDistribution} instances
+ * containing percentile distributions, min/max values, and operation counts
for both active thread
+ * count and queue size metrics</li>
+ * </ol>
+ * <h3>Usage Patterns</h3>
+ * <p>
+ * <b>CQSI Level:</b> For ConnectionQueryServicesImpl thread pools, the
histogram key is always the
+ * connection URL, allowing multiple CQSI instances with the same connection
info to share the same
+ * {@link HTableThreadPoolHistograms} instance.
+ * </p>
+ * <p>
+ * <b>External Thread Pools:</b> For user-defined thread pools, the histogram
key can be the thread
+ * pool name or any unique identifier chosen by the application.
+ * </p>
+ * @see HTableThreadPoolHistograms
+ * @see org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats
+ * @see org.apache.phoenix.util.PhoenixRuntime#getHTableThreadPoolHistograms()
+ */
+public class HTableThreadPoolMetricsManager {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(HTableThreadPoolMetricsManager.class);
+
+ private static final ConcurrentHashMap<String, HTableThreadPoolHistograms>
+ THREAD_POOL_HISTOGRAMS_MAP = new ConcurrentHashMap<>();
+
+ private HTableThreadPoolMetricsManager() {
+ }
+
+ public static Map<String, List<HistogramDistribution>>
getHistogramsForAllThreadPools() {
+ Map<String, List<HistogramDistribution>> map = new HashMap<>();
+ for (Map.Entry<String, HTableThreadPoolHistograms> entry
+ : THREAD_POOL_HISTOGRAMS_MAP.entrySet()) {
+ HTableThreadPoolHistograms hTableThreadPoolHistograms =
entry.getValue();
+ map.put(entry.getKey(),
+
hTableThreadPoolHistograms.getThreadPoolHistogramsDistribution());
+ }
+ return map;
+ }
+
+ private static HTableThreadPoolHistograms getThreadPoolHistograms(
+ String histogramKey, Supplier<HTableThreadPoolHistograms> supplier) {
+ if (supplier == null) {
+ return null;
+ }
+ return THREAD_POOL_HISTOGRAMS_MAP.computeIfAbsent(histogramKey, k ->
supplier.get());
+ }
+
+ /**
+ * Records the current number of active threads in the thread pool in the
histogram.
+ * @param histogramKey Key to uniquely identify {@link
HTableThreadPoolHistograms} instance.
+ * @param activeThreads Number of active threads in the thread pool.
+ * @param supplier An idempotent supplier of {@link
HTableThreadPoolHistograms}.
+ */
+ public static void updateActiveThreads(String histogramKey, int
activeThreads,
+ Supplier<HTableThreadPoolHistograms> supplier) {
+ HTableThreadPoolHistograms hTableThreadPoolHistograms =
+ getThreadPoolHistograms(histogramKey, supplier);
+ if (hTableThreadPoolHistograms != null) {
+ hTableThreadPoolHistograms.updateActiveThreads(activeThreads);
+ } else {
+ logWarningForNullSupplier(histogramKey);
+ }
+ }
+
+ /**
+ * Records the current number of tasks in the thread pool's queue in the
histogram.
+ * @param histogramKey Key to uniquely identify {@link
HTableThreadPoolHistograms} instance.
+ * @param queueSize Number of tasks in the HTable thread pool's queue.
+ * @param supplier An idempotent supplier of {@link
HTableThreadPoolHistograms}.
+ */
+ public static void updateQueueSize(String histogramKey, int queueSize,
+ Supplier<HTableThreadPoolHistograms> supplier) {
+ HTableThreadPoolHistograms hTableThreadPoolHistograms =
+ getThreadPoolHistograms(histogramKey, supplier);
+ if (hTableThreadPoolHistograms != null) {
+ hTableThreadPoolHistograms.updateQueuedSize(queueSize);
+ } else {
+ logWarningForNullSupplier(histogramKey);
+ }
+ }
+
+ private static void logWarningForNullSupplier(String threadPoolName) {
+ LOGGER.warn("No HTable thread pool histograms created for thread pool
{}", threadPoolName);
+ }
+
+ @VisibleForTesting
+ public static void clearHTableThreadPoolHistograms() {
+ THREAD_POOL_HISTOGRAMS_MAP.clear();
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HistogramDistribution.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HistogramDistribution.java
index 4e8039c27f..269bd65c9c 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HistogramDistribution.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HistogramDistribution.java
@@ -19,6 +19,8 @@ package org.apache.phoenix.monitoring;
import java.util.Map;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
public interface HistogramDistribution {
public long getMin();
@@ -30,4 +32,12 @@ public interface HistogramDistribution {
public Map<String, Long> getRangeDistributionMap();
+ default ImmutableMap<String, Long> getPercentileDistributionMap() {
+ throw new UnsupportedOperationException("Percentile Histogram
Distribution is not "
+ + "supported!!");
+ }
+
+ default ImmutableMap<String, String> getTags() {
+ return ImmutableMap.of();
+ }
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogram.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogram.java
new file mode 100644
index 0000000000..adbaabdc55
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogram.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for creating percentile-based histograms that capture
and analyze the
+ * distribution of recorded values. This histogram tracks values up to a
specified maximum and
+ * provides statistical analysis including percentile distributions, min/max
values, and operation
+ * counts.
+ * <p>
+ * <b>Internal Use Only:</b> This class is for internal use only and should
not be used directly by
+ * users of Phoenix.
+ * </p>
+ * <br/>
+ * Key features:
+ * <ul>
+ * <li>Records values efficiently using {@link org.HdrHistogram.Histogram}
internally</li>
+ * <li>Generates percentile distributions through concrete implementations</li>
+ * <li>Supports optional metadata tags for enhanced monitoring context</li>
+ * <li>Generates immutable statistical snapshots with percentile analysis</li>
+ * <li>Automatically handles values exceeding the maximum trackable limit</li>
+ * </ul>
+ * <br/>
+ * Usage workflow:
+ * <ol>
+ * <li>Record values using {@link #addValue(long)}</li>
+ * <li>Optionally attach metadata using {@link #addTag(String, String)}</li>
+ * <li>Generate distribution snapshots via {@link
#getPercentileHistogramDistribution()}</li>
+ * </ol>
+ * <br/>
+ * Concrete implementations must define {@link
#generateDistributionMap(Histogram)} to specify which
+ * percentiles and statistics to include in the distribution.
+ */
+abstract class PercentileHistogram {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PercentileHistogram.class);
+
+ // Strings used to create metrics names.
+ static final String NUM_OPS_METRIC_NAME = "_num_ops";
+ static final String MIN_METRIC_NAME = "_min";
+ static final String MAX_METRIC_NAME = "_max";
+ static final String MEDIAN_METRIC_NAME = "_median";
+ static final String TWENTY_FIFTH_PERCENTILE_METRIC_NAME =
"_25th_percentile";
+ static final String SEVENTY_FIFTH_PERCENTILE_METRIC_NAME =
"_75th_percentile";
+ static final String NINETIETH_PERCENTILE_METRIC_NAME = "_90th_percentile";
+ static final String NINETY_FIFTH_PERCENTILE_METRIC_NAME =
"_95th_percentile";
+
+ private Histogram prevHistogram = null;
+ /**
+ * The recorder that records the values in a {@link Histogram}.
+ */
+ private final Recorder recorder;
+ private final String name;
+ private final long maxUtil;
+ private Map<String, String> tags = null;
+
+ PercentileHistogram(long maxUtil, String name) {
+ this.name = name;
+ this.maxUtil = maxUtil;
+ this.recorder = new Recorder(maxUtil, 2);
+ }
+
+ /**
+ * Records a value in the histogram for statistical analysis. The recorded
value will be
+ * included in subsequent percentile calculations and distribution
snapshot generated by
+ * {@link #getPercentileHistogramDistribution()}. <br/>
+ * <br/>
+ * Values exceeding the maximum trackable limit (specified during
histogram construction) are
+ * automatically rejected with a warning logged. This prevents histogram
corruption while
+ * maintaining recording performance.
+ * @param value the value to record, must be within the histogram's
trackable range
+ */
+ void addValue(long value) {
+ if (value > maxUtil) {
+ // Ignoring recording value more than maximum trackable value.
+ LOGGER.warn("Histogram recording higher value than maximum.
Ignoring it.");
+ return;
+ }
+ recorder.recordValue(value);
+ }
+
+ /**
+ * Generates an immutable snapshot containing the percentile distribution
of values recorded
+ * since the last call to this method. The snapshot includes statistical
metrics such as min/max
+ * values, operation count, percentile values, and any attached tags. <br/>
+ * <br/>
+ * This method captures an interval histogram (values recorded since the
previous snapshot) and
+ * delegates to concrete implementations via {@link
#generateDistributionMap(Histogram)} to
+ * determine which specific percentiles and metrics to include. <br/>
+ * <br/>
+ * The returned {@link PercentileHistogramDistribution} is thread-safe and
immutable, making it
+ * suitable for concurrent access by monitoring systems.
+ * @return an immutable {@link HistogramDistribution} containing
percentile analysis and
+ * statistics
+ */
+ HistogramDistribution getPercentileHistogramDistribution() {
+ Histogram histogram =
this.recorder.getIntervalHistogram(prevHistogram);
+ HistogramDistribution distribution;
+ if (tags == null) {
+ distribution = new PercentileHistogramDistribution(name,
histogram.getMinValue(),
+ histogram.getMaxValue(), histogram.getTotalCount(),
+ generateDistributionMap(histogram));
+ } else {
+ distribution = new PercentileHistogramDistribution(name,
histogram.getMinValue(),
+ histogram.getMaxValue(), histogram.getTotalCount(),
+ generateDistributionMap(histogram), tags);
+ }
+ this.prevHistogram = histogram;
+ return distribution;
+ }
+
+ /**
+ * Attaches a metadata tag to the histogram as a key-value pair. Tags
provide additional context
+ * about the histogram data and are included in all distribution snapshots
generated by
+ * {@link #getPercentileHistogramDistribution()}. <br/>
+ * <br/>
+ * Tags are commonly used for dimensional monitoring, allowing metrics to
be filtered and
+ * grouped by tag names. Multiple tags can be added to the same histogram,
and duplicate keys
+ * will overwrite previous values.
+ * @param key the tag key
+ * @param value the tag value
+ */
+ void addTag(String key, String value) {
+ if (tags == null) {
+ tags = new HashMap<>();
+ }
+ tags.put(key, value);
+ }
+
+ /**
+ * Generates a map of percentile distribution where the key is the
percentile name (e.g.,
+ * "_90th_percentile", "_95th_percentile", "_median", etc.) and the value
is the actual
+ * percentile value from the histogram snapshot. This method is for
internal use only and is
+ * called from {@link #getPercentileHistogramDistribution()}.
+ * @param snapshot the snapshot of the {@link Histogram}
+ * @return a map of percentile distribution with percentile names as keys
and their
+ * corresponding values
+ */
+ protected abstract Map<String, Long> generateDistributionMap(Histogram
snapshot);
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogramDistribution.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogramDistribution.java
new file mode 100644
index 0000000000..5c81d9c5da
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogramDistribution.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+/**
+ * <b>External User-Facing API</b>
+ * <p>
+ * An immutable snapshot of percentile distribution data captured by a {@link
PercentileHistogram}.
+ * This class provides a consumer-friendly interface to access histogram
statistics without exposing
+ * the underlying {@link org.HdrHistogram.Histogram} instances. <br/>
+ * <br/>
+ * This class contains:
+ * <ul>
+ * <li>Percentile distribution map with percentile names as keys and their
values</li>
+ * <li>Basic statistics (minimum, maximum, total count)</li>
+ * <li>Optional tags for additional metadata</li>
+ * </ul>
+ * <br/>
+ * Use {@link PercentileHistogram#getPercentileHistogramDistribution()} to get
the percentile
+ * distribution captured by the histogram. Use {@link #getTags()} to access
any tags attached to the
+ * histogram. <br/>
+ * <br/>
+ * All data in this class is immutable after construction, making it safe for
concurrent access.
+ */
+public class PercentileHistogramDistribution extends HistogramDistributionImpl
{
+ private final ImmutableMap<String, Long> percentileDistributionMap;
+ private ImmutableMap<String, String> tags = null;
+
+ PercentileHistogramDistribution(String histoName, long min, long max, long
count,
+ Map<String, Long>
percentileDistributionMap) {
+ super(histoName, min, max, count, Collections.emptyMap());
+ this.percentileDistributionMap =
ImmutableMap.copyOf(percentileDistributionMap);
+ }
+
+ PercentileHistogramDistribution(String histoName, long min, long max, long
count,
+ Map<String, Long>
percentileDistributionMap,
+ Map<String, String> tags) {
+ this(histoName, min, max, count, percentileDistributionMap);
+ this.tags = ImmutableMap.copyOf(tags);
+ }
+
+ /**
+ * Returns an immutable map containing the percentile distribution and
statistical data captured
+ * by the histogram. The map contains metric names as keys and their
corresponding values. <br/>
+ * <br/>
+ * The map includes:
+ * <ul>
+ * <li>Percentile values (e.g., "_90th_percentile", "_95th_percentile",
"_median")</li>
+ * <li>Statistical metrics (e.g., "_min", "_max", "_num_ops")</li>
+ * </ul>
+ * <br/>
+ * This is the primary method for accessing percentile analysis results.
The specific
+ * percentiles and statistics included depend on the concrete {@link
PercentileHistogram}
+ * implementation that generated this distribution. <br/>
+ * <br/>
+ * The returned map is immutable and safe for concurrent access.
+ * @return an immutable map of metric names to their calculated values
+ */
+ public ImmutableMap<String, Long> getPercentileDistributionMap() {
+ return percentileDistributionMap;
+ }
+
+ public Map<String, Long> getRangeDistributionMap() {
+ throw new UnsupportedOperationException("Range Histogram Distribution
is not supported!!");
+ }
+
+ /**
+ * Returns the metadata tags associated with this histogram distribution.
Tags provide
+ * additional context about the histogram data and are commonly used for
dimensional monitoring,
+ * allowing metrics to be filtered and grouped by tag names. <br/>
+ * <br/>
+ * Tags are attached to the histogram using {@link
PercentileHistogram#addTag(String, String)}
+ * before generating the distribution snapshot. <br/>
+ * <br/>
+ * The returned map is immutable and safe for concurrent access.
+ * @return an immutable map of tag key-value pairs, or an empty map if no
tags were attached
+ */
+ public ImmutableMap<String, String> getTags() {
+ return tags == null ? ImmutableMap.of() : tags;
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/UtilizationHistogram.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/UtilizationHistogram.java
new file mode 100644
index 0000000000..bc136fec3f
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/UtilizationHistogram.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.HdrHistogram.Histogram;
+
+/**
+ * A concrete implementation of {@link PercentileHistogram} specifically
designed for tracking
+ * utilization metrics. This histogram captures and provides a comprehensive
set of statistical
+ * metrics including percentile distributions, operation counts, and min/max
values. <br/>
+ * <br/>
+ * <p>
+ * <b>Internal Use Only:</b> This class is for internal use only and should
not be used directly by
+ * users of Phoenix.
+ * </p>
+ * <p>
+ * The histogram generates the following metrics:
+ * </p>
+ * <ul>
+ * <li>Number of operations (total count)</li>
+ * <li>Minimum and maximum recorded values</li>
+ * <li>25th, 50th (median), 75th, 90th, and 95th percentiles</li>
+ * </ul>
+ */
+class UtilizationHistogram extends PercentileHistogram {
+
+ UtilizationHistogram(long maxUtil, String name) {
+ super(maxUtil, name);
+ }
+
+ protected Map<String, Long> generateDistributionMap(Histogram snapshot) {
+ Map<String, Long> metrics = new HashMap<>();
+ metrics.put(NUM_OPS_METRIC_NAME, snapshot.getTotalCount());
+ metrics.put(MIN_METRIC_NAME, snapshot.getMinValue());
+ metrics.put(MAX_METRIC_NAME, snapshot.getMaxValue());
+ metrics.put(TWENTY_FIFTH_PERCENTILE_METRIC_NAME,
snapshot.getValueAtPercentile(25));
+ metrics.put(MEDIAN_METRIC_NAME, snapshot.getValueAtPercentile(50));
+ metrics.put(SEVENTY_FIFTH_PERCENTILE_METRIC_NAME,
+ snapshot.getValueAtPercentile(75));
+ metrics.put(NINETIETH_PERCENTILE_METRIC_NAME,
snapshot.getValueAtPercentile(90));
+ metrics.put(NINETY_FIFTH_PERCENTILE_METRIC_NAME,
snapshot.getValueAtPercentile(95));
+ return metrics;
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 4329392bcd..c6055a0e10 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -75,6 +75,7 @@ import static
org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAIL
import static
org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_SUCCESS;
import static
org.apache.phoenix.monitoring.MetricType.TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS;
import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_ENABLED;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE;
@@ -134,6 +135,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import javax.annotation.concurrent.GuardedBy;
@@ -225,14 +227,18 @@ import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.iterate.TableResultIterator;
import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
+import org.apache.phoenix.jdbc.AbstractRPCConnectionInfo;
import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.RPCConnectionInfo;
+import org.apache.phoenix.jdbc.ZKConnectionInfo;
+import org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats;
import org.apache.phoenix.log.ConnectionLimiter;
import org.apache.phoenix.log.DefaultConnectionLimiter;
import org.apache.phoenix.log.LoggingConnectionLimiter;
import org.apache.phoenix.log.QueryLoggerDisruptor;
+import org.apache.phoenix.monitoring.HTableThreadPoolHistograms;
import org.apache.phoenix.monitoring.TableMetricsManager;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
@@ -478,16 +484,18 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
// Based on implementations used in
// org.apache.hadoop.hbase.client.ConnectionImplementation
final BlockingQueue<Runnable> workQueue = new
LinkedBlockingQueue<>(maxQueue);
+ Supplier<HTableThreadPoolHistograms>
hTableThreadPoolHistogramsSupplier =
+ getThreadPoolHistogramsSupplier(maxThreads, maxQueue);
this.threadPoolExecutor =
- new ThreadPoolExecutor(corePoolSize, maxThreads,
keepAlive, TimeUnit.SECONDS,
- workQueue, new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("CQSI-" + threadPoolName
- + "-" +
threadPoolNumber.incrementAndGet()
- + "-shared-pool-%d")
- .setUncaughtExceptionHandler(
-
Threads.LOGGING_EXCEPTION_HANDLER)
- .build());
+ new HTableThreadPoolWithUtilizationStats(corePoolSize,
maxThreads, keepAlive,
+ TimeUnit.SECONDS, workQueue, new
ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("CQSI-" + threadPoolName
+ + "-" + threadPoolNumber.incrementAndGet()
+ + "-shared-pool-%d")
+ .setUncaughtExceptionHandler(
+ Threads.LOGGING_EXCEPTION_HANDLER)
+ .build(), connectionInfo.toUrl(),
hTableThreadPoolHistogramsSupplier);
this.threadPoolExecutor.allowCoreThreadTimeOut(finalConfig
.getBoolean(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT));
@@ -602,6 +610,34 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
}
+ private Supplier<HTableThreadPoolHistograms>
getThreadPoolHistogramsSupplier(
+ int maxThreadPoolSize, int maxQueueSize) {
+ if (this.config.getBoolean(CQSI_THREAD_POOL_METRICS_ENABLED,
+ DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED)) {
+ return new Supplier<HTableThreadPoolHistograms>() {
+ @Override
+ public HTableThreadPoolHistograms get() {
+ HTableThreadPoolHistograms hTableThreadPoolHistograms =
+ new HTableThreadPoolHistograms(maxThreadPoolSize,
maxQueueSize);
+ if (connectionInfo instanceof ZKConnectionInfo) {
+ hTableThreadPoolHistograms.addServerTag(
+ ((ZKConnectionInfo)
connectionInfo).getZkHosts());
+ } else if (connectionInfo instanceof
AbstractRPCConnectionInfo) {
+ hTableThreadPoolHistograms.addServerTag(
+ ((AbstractRPCConnectionInfo)
connectionInfo).getBoostrapServers());
+ } else {
+ throw new IllegalStateException("Unexpected connection
info type!!");
+ }
+ String cqsiName = connectionInfo.getPrincipal();
+ hTableThreadPoolHistograms.addCqsiNameTag(cqsiName != null
+ ? cqsiName : DEFAULT_QUERY_SERVICES_NAME);
+ return hTableThreadPoolHistograms;
+ }
+ };
+ }
+ return null;
+ }
+
private void openConnection() throws SQLException {
try {
this.connection =
HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config,
threadPoolExecutor);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index c62888a5e5..ea95e05f2b 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -468,6 +468,8 @@ public interface QueryServices extends SQLCloseable {
*/
String TESTS_MINI_CLUSTER_NUM_REGION_SERVERS =
"phoenix.tests.minicluster.numregionservers";
+ String TESTS_MINI_CLUSTER_NUM_MASTERS =
"phoenix.tests.minicluster.nummasters";
+
/**
* Config to inject any processing after the client retrieves dummy result
from the server.
@@ -509,6 +511,8 @@ public interface QueryServices extends SQLCloseable {
long DEFAULT_PHOENIX_METADATA_CACHE_UPDATE_ROWLOCK_TIMEOUT = 60000;
+ String CQSI_THREAD_POOL_METRICS_ENABLED =
"phoenix.cqsi.thread.pool.metrics.enabled";
+
/**
* Get executor service used for parallel scans
*/
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 345ca39915..5c9210ef20 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -35,6 +35,7 @@ import static
org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_ME
import static
org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED;
import static
org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_CLASSNAME;
import static
org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_METRICS_ENABLED;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
@@ -440,6 +441,7 @@ public class QueryServicesOptions {
public static final int DEFAULT_CQSI_THREAD_POOL_MAX_THREADS = 25;
public static final int DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE = 512;
public static final Boolean
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true;
+ public static final Boolean DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED =
false;
private final Configuration config;
@@ -552,7 +554,8 @@ public class QueryServicesOptions {
.setIfUnset(CQSI_THREAD_POOL_MAX_THREADS,
DEFAULT_CQSI_THREAD_POOL_MAX_THREADS)
.setIfUnset(CQSI_THREAD_POOL_MAX_QUEUE,
DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE)
.setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
- DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);
+ DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT)
+ .setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED,
DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED);
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user
set
@@ -793,6 +796,15 @@ public class QueryServicesOptions {
.getBoolean(TABLE_LEVEL_METRICS_ENABLED,
DEFAULT_IS_TABLE_LEVEL_METRICS_ENABLED);
}
+ public boolean isCQSIThreadPoolMetricsEnabled() {
+ return config.getBoolean(CQSI_THREAD_POOL_METRICS_ENABLED,
+ DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED);
+ }
+
+ public void setCQSIThreadPoolMetricsEnabled(boolean enabled) {
+ config.setBoolean(CQSI_THREAD_POOL_METRICS_ENABLED, enabled);
+ }
+
public void setTableLevelMetricsEnabled() {
set(TABLE_LEVEL_METRICS_ENABLED, true);
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 290368b47c..faa6d4a448 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -66,6 +66,7 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.monitoring.GlobalMetric;
+import org.apache.phoenix.monitoring.HTableThreadPoolMetricsManager;
import org.apache.phoenix.monitoring.HistogramDistribution;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.monitoring.PhoenixTableMetric;
@@ -1409,6 +1410,53 @@ public class PhoenixRuntime {
return TableMetricsManager.getSizeHistogramsForAllTables();
}
+ /**
+ * Retrieves comprehensive HTable thread pool utilization and contention
metrics collected
+ * across all monitored HTable thread pools.
+ * <p>
+ * This method provides access to detailed performance histograms that
track two critical thread
+ * pool metrics:
+ * </p>
+ * <ul>
+ * <li><b>Active Threads Count</b> - Distribution of the number of threads
actively executing
+ * tasks</li>
+ * <li><b>Queue Size</b> - Distribution of the number of tasks waiting in
thread pool
+ * queues</li>
+ * </ul>
+ * <p>
+ * The metrics are automatically collected by
+ * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}, a
specialized
+ * ThreadPoolExecutor that instruments HTable operations. These statistics
help identify thread
+ * pool bottlenecks, optimize thread pool configurations, and monitor
overall HTable performance
+ * characteristics.
+ * </p>
+ * <p>
+ * <b>Metric Sources:</b>
+ * </p>
+ * <ul>
+ * <li><b>CQSI Thread Pools</b> - Internal Phoenix CQSI-level thread pools
(identified by
+ * connection URL)</li>
+ * <li><b>External Thread Pools</b> - User-defined HTable thread pools
created with
+ * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}</li>
+ * </ul>
+ * <p>
+ * Each histogram includes percentile distributions (P50, P90, P95, P99,
etc.), min/max values,
+ * and operation counts. The returned map is keyed by histogram identifier
and contains
+ * dimensional tags for enhanced monitoring capabilities.
+ * </p>
+ * @return a map where keys are histogram identifiers (connection URLs for
CQSI pools, or custom
+ * names for external pools) and values are lists of {@link
HistogramDistribution}
+ * instances containing comprehensive utilization statistics.
Returns an empty map if no
+ * HTable thread pools have been monitored or if metrics
collection is disabled.
+ * @see org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats
+ * @see HTableThreadPoolMetricsManager
+ * @see org.apache.phoenix.monitoring.HTableThreadPoolHistograms
+ * @see HistogramDistribution
+ */
+ public static Map<String, List<HistogramDistribution>>
getHTableThreadPoolHistograms() {
+ return HTableThreadPoolMetricsManager.getHistogramsForAllThreadPools();
+ }
+
public static Map<String, List<HistogramDistribution>>
getAllConnectionQueryServicesHistograms() {
return
ConnectionQueryServicesMetricsManager.getHistogramsForAllConnectionQueryServices();
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
index 49eaa7ede7..ceb2e66d6a 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
@@ -42,11 +42,13 @@ import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE;
import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_METRICS_ENABLED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
import java.lang.reflect.Field;
import java.sql.Connection;
@@ -55,12 +57,15 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
@@ -68,10 +73,15 @@ import
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilit
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.log.LogLevel;
import org.apache.phoenix.monitoring.GlobalMetric;
+import org.apache.phoenix.monitoring.HTableThreadPoolHistograms;
+import org.apache.phoenix.monitoring.HTableThreadPoolMetricsManager;
+import org.apache.phoenix.monitoring.HistogramDistribution;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -80,6 +90,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,8 +127,9 @@ public class ParallelPhoenixConnectionIT {
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_CORE_POOL_SIZE,
String.valueOf(17));
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_THREADS,
String.valueOf(19));
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_QUEUE,
String.valueOf(23));
-
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
String.valueOf(true));
-
+
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+ String.valueOf(true));
+ GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_METRICS_ENABLED,
String.valueOf(true));
}
@AfterClass
@@ -215,6 +227,107 @@ public class ParallelPhoenixConnectionIT {
}
}
+ @Test
+ public void testCqsiThreadPoolMetricsForParallelConnection() throws
Exception {
+ try (Connection conn = getParallelConnection()) {
+ ParallelPhoenixConnection pr =
conn.unwrap(ParallelPhoenixConnection.class);
+
+ // Get details of connection#1
+ PhoenixConnection pConn1 = pr.getFutureConnection1().get();
+ Configuration config1 =
pConn1.getQueryServices().getConfiguration();
+ String zkQuorum1 = config1.get(HConstants.ZOOKEEPER_QUORUM);
+ String principal1 = config1.get(QueryServices.QUERY_SERVICES_NAME);
+
+ // Get details of connection#2
+ PhoenixConnection pConn2 = pr.getFutureConnection2().get();
+ Configuration config2 =
pConn2.getQueryServices().getConfiguration();
+ String zkQuorum2 = config2.get(HConstants.ZOOKEEPER_QUORUM);
+ String principal2 = config2.get(QueryServices.QUERY_SERVICES_NAME);
+
+ // Slow down connection#1
+ CountDownLatch latch = new CountDownLatch(1);
+ slowDownConnection(pr, pr.getFutureConnection1(),
"futureConnection1", latch);
+
+ try (Statement stmt = conn.createStatement()) {
+
HTableThreadPoolMetricsManager.getHistogramsForAllThreadPools();
+ try (ResultSet rs =
+ stmt.executeQuery(String.format("SELECT COUNT(*) FROM %s",
tableName))) {
+ assertTrue(rs.next());
+ assertEquals(0, rs.getInt(1));
+ assertFalse(rs.next());
+ }
+
+ Map<String, List<HistogramDistribution>> htableHistograms =
+
HTableThreadPoolMetricsManager.getHistogramsForAllThreadPools();
+
+ // Assert connection#1 CQSI thread pool metrics
+ String conn1HistogramKey = getHistogramKey(config1);
+
assertHTableThreadPoolHistograms(htableHistograms.get(conn1HistogramKey),
+ conn1HistogramKey, false, zkQuorum1, principal1);
+
+ // Assert connection#2 CQSI thread pool metrics
+ String conn2HistogramKey = getHistogramKey(config2);
+
assertHTableThreadPoolHistograms(htableHistograms.get(conn2HistogramKey),
+ conn2HistogramKey, true, zkQuorum2, principal2);
+
+ // Assert that the CQSI thread pool metrics for both
connections are different
+ Assert.assertNotEquals(conn1HistogramKey, conn2HistogramKey);
+ } finally {
+ latch.countDown();
+ }
+ }
+ }
+
+ private void slowDownConnection(ParallelPhoenixConnection pr,
+ CompletableFuture<PhoenixConnection> pConn, String
futureConnectionField,
+ CountDownLatch latch) throws Exception {
+ Assert.assertTrue(futureConnectionField.equals("futureConnection1")
+ || futureConnectionField.equals("futureConnection2"));
+
+ PhoenixConnection spy = Mockito.spy(pConn.get());
+ doAnswer((invocation) -> {
+ // Block the statement creation until the latch is counted down
+ latch.await();
+ return invocation.callRealMethod();
+ }).when(spy).createStatement();
+
+ // Replace the existing CompletableFuture with the spied
CompletableFuture
+ Field futureField =
ParallelPhoenixConnection.class.getDeclaredField(futureConnectionField);
+ futureField.setAccessible(true);
+ CompletableFuture<PhoenixConnection> spiedFuture =
CompletableFuture.completedFuture(spy);
+ futureField.set(pr, spiedFuture);
+
+ // Verify that the spied CompletableFuture has been setup correctly
+ if (futureConnectionField.equals("futureConnection1")) {
+ Assert.assertSame(spy, pr.getFutureConnection1().get());
+ } else {
+ Assert.assertSame(spy, pr.getFutureConnection2().get());
+ }
+ }
+
+ private String getHistogramKey(Configuration config) throws SQLException {
+ String url =
+ QueryUtil.getConnectionUrl(clientProperties, config,
HBaseTestingUtilityPair.PRINCIPAL);
+ return ConnectionInfo.createNoLogin(url, null, null).toUrl();
+ }
+
+ private void assertHTableThreadPoolHistograms(List<HistogramDistribution>
histograms,
+ String histogramKey, boolean isUsed, String zkQuorum, String
principal) {
+ Assert.assertNotNull(histograms);
+ assertEquals(2, histograms.size());
+ for (HistogramDistribution histogram : histograms) {
+ if (isUsed) {
+ assertTrue(histogram.getCount() > 0);
+ } else {
+ assertEquals(0, histogram.getCount());
+ }
+ assertEquals(zkQuorum,
+
histogram.getTags().get(HTableThreadPoolHistograms.Tag.servers.name()));
+ assertEquals(principal,
+
histogram.getTags().get(HTableThreadPoolHistograms.Tag.cqsiName.name()));
+ }
+ }
+
/**
* Test Phoenix connection creation and basic operations with HBase
cluster(s) unavailable.
*/
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BaseHTableThreadPoolMetricsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BaseHTableThreadPoolMetricsIT.java
new file mode 100644
index 0000000000..0d73dabdda
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BaseHTableThreadPoolMetricsIT.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.Assert;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.phoenix.monitoring.HTableThreadPoolHistograms.HistogramName;
+
+public class BaseHTableThreadPoolMetricsIT extends BaseTest {
+ private static final int ROWS_TO_LOAD_INITIALLY = 10000;
+
+ protected void assertHistogramTags(
+ Map<String, List<HistogramDistribution>>
htableThreadPoolHistograms,
+ Map<String, String> expectedTagKeyValues, String histogramKey)
throws SQLException {
+ List<HistogramDistribution> histograms =
htableThreadPoolHistograms.get(histogramKey);
+ Assert.assertEquals(2, histograms.size());
+ for (HistogramDistribution histogram : histograms) {
+ Map<String, String> tags = histogram.getTags();
+
+ // Assert that there only expected tag names are there
+ Assert.assertEquals(expectedTagKeyValues.keySet(), tags.keySet());
+
+ for (Map.Entry<String, String> tag : tags.entrySet()) {
+ String tagName = tag.getKey();
+ Assert.assertEquals(expectedTagKeyValues.get(tagName),
tags.get(tagName));
+ }
+ }
+ }
+
+ protected void assertHTableThreadPoolUsed(
+ Map<String, List<HistogramDistribution>>
htableThreadPoolHistograms, String histogramKey) {
+ boolean foundActiveThreadsCountHistogram = false;
+ boolean foundQueueSizeHistogram = false;
+
+ List<HistogramDistribution> histograms =
htableThreadPoolHistograms.get(histogramKey);
+ // Assert each HTableThreadPoolHistograms has 2 HdrHistograms
+ Assert.assertEquals(2, histograms.size());
+ for (HistogramDistribution histogram : histograms) {
+ if
(histogram.getHistoName().equals(HistogramName.ActiveThreadsCount.name())) {
+
+ // Assert that at least row count no. of requests were
processed by
+ // HTableThreadPool as we are fetching 1 row per scan RPC
+ Assert.assertTrue(ROWS_TO_LOAD_INITIALLY
+ <= histogram.getPercentileDistributionMap().get(
+ PercentileHistogram.NUM_OPS_METRIC_NAME).intValue());
+ foundActiveThreadsCountHistogram = true;
+ }
+ else if
(histogram.getHistoName().equals(HistogramName.QueueSize.name())) {
+ foundQueueSizeHistogram = true;
+ }
+ }
+
+ // Assert that the HTableThreadPool expected to be used was actually
used
+ Assert.assertTrue(foundActiveThreadsCountHistogram);
+ Assert.assertTrue(foundQueueSizeHistogram);
+ }
+
+ protected void assertHTableThreadPoolNotUsed(
+ Map<String, List<HistogramDistribution>>
htableThreadPoolHistograms,
+ String histogramKey) {
+ boolean foundActiveThreadsCountHistogram = false;
+ boolean foundQueueSizeHistogram = false;
+ List<HistogramDistribution> histograms =
htableThreadPoolHistograms.get(histogramKey);
+ Assert.assertEquals(2, histograms.size());
+ for (HistogramDistribution histogram : histograms) {
+ if
(histogram.getHistoName().equals(HistogramName.ActiveThreadsCount.name())) {
+ foundActiveThreadsCountHistogram = true;
+ }
+ else if
(histogram.getHistoName().equals(HistogramName.QueueSize.name())) {
+ foundQueueSizeHistogram = true;
+ }
+
Assert.assertFalse(histogram.getPercentileDistributionMap().isEmpty());
+ for (long value:
histogram.getPercentileDistributionMap().values()) {
+ Assert.assertEquals(0, value);
+ }
+ }
+ Assert.assertTrue(foundActiveThreadsCountHistogram);
+ Assert.assertTrue(foundQueueSizeHistogram);
+ }
+
+ protected Map<String, List<HistogramDistribution>>
runQueryAndGetHistograms(Connection conn,
+
String tableName)
+ throws SQLException {
+ Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+
+ try (Statement stmt = conn.createStatement()) {
+ // Per row submit one task for execution in HTable thread pool
+ stmt.setFetchSize(1);
+ try (ResultSet rs = stmt.executeQuery("SELECT * FROM " +
tableName)) {
+ int rowsRead = 0;
+ // Reset the histograms
+ PhoenixRuntime.getHTableThreadPoolHistograms();
+ while (rs.next()) {
+ rowsRead++;
+ }
+ Assert.assertEquals(ROWS_TO_LOAD_INITIALLY, rowsRead);
+ htableThreadPoolHistograms =
PhoenixRuntime.getHTableThreadPoolHistograms();
+ }
+ }
+ return htableThreadPoolHistograms;
+ }
+
+ protected void createTableAndUpsertData(Connection conn, String tableName)
throws SQLException {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE TABLE " + tableName + " (k VARCHAR NOT NULL
PRIMARY KEY, v "
+ + "VARCHAR)");
+ }
+ try (PreparedStatement stmt =conn.prepareStatement("UPSERT INTO " +
tableName
+ + " VALUES (?, ?)")) {
+ for (int i = 1; i <= ROWS_TO_LOAD_INITIALLY; i++) {
+ stmt.setString(1, "k" + i);
+ stmt.setString(2, "v" + i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ }
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CQSIThreadPoolMetricsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CQSIThreadPoolMetricsIT.java
new file mode 100644
index 0000000000..7f69578d24
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CQSIThreadPoolMetricsIT.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.AbstractRPCConnectionInfo;
+import org.apache.phoenix.jdbc.ConnectionInfo;
+import org.apache.phoenix.jdbc.ZKConnectionInfo;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static
org.apache.phoenix.jdbc.ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
+import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
+import static
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_METRICS_ENABLED;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_QUERY_SERVICES_NAME;
+
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class CQSIThreadPoolMetricsIT extends BaseHTableThreadPoolMetricsIT {
+
+ private final String registryClassName;
+ private final Properties props = new Properties();
+
+ public CQSIThreadPoolMetricsIT(String registryClassName) {
+ this.registryClassName = registryClassName;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ final Configuration conf = HBaseConfiguration.create();
+ setUpConfigForMiniCluster(conf);
+ conf.setBoolean(CQSI_THREAD_POOL_METRICS_ENABLED, true);
+ conf.setBoolean(CQSI_THREAD_POOL_ENABLED, true);
+
+ InstanceResolver.clearSingletons();
+ // Override to get required config for static fields loaded that
require HBase config
+ InstanceResolver.getSingleton(ConfigurationFactory.class, new
ConfigurationFactory() {
+
+ @Override public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override public Configuration getConfiguration(Configuration
confToClone) {
+ Configuration copy = new Configuration(conf);
+ copy.addResource(confToClone);
+ return copy;
+ }
+ });
+
+ Map<String, String> props = new HashMap<>();
+ props.put(QueryServices.TESTS_MINI_CLUSTER_NUM_MASTERS, "2");
+ setUpTestDriver(new ReadOnlyProps(props));
+ }
+
+ @Before
+ public void testCaseSetup() {
+ props.setProperty(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
registryClassName);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ driver.cleanUpCQSICache();
+ HTableThreadPoolMetricsManager.clearHTableThreadPoolHistograms();
+ props.clear();
+ }
+
+ @Parameterized.Parameters(name =
"ExternalHTableThreadPoolMetricsIT_registryClassName={0}")
+ public synchronized static Collection<String> data() {
+ return Arrays.asList(ZKConnectionInfo.ZK_REGISTRY_NAME,
+ "org.apache.hadoop.hbase.client.RpcConnectionRegistry",
+ "org.apache.hadoop.hbase.client.MasterRegistry");
+ }
+
+ @Test
+ public void testHistogramsPerConnInfo() throws Exception {
+ String tableName = generateUniqueName();
+ String histogramKey;
+ String cqsiNameService1 = "service1";
+ String cqsiNameService2 = "service2";
+
+ // Create a connection for "service1" connection profile
+ String url = QueryUtil.getConnectionUrl(props,
utility.getConfiguration(),
+ cqsiNameService1);
+ Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+ try (Connection conn = driver.connect(url, props)) {
+ createTableAndUpsertData(conn, tableName);
+
+ htableThreadPoolHistograms = runQueryAndGetHistograms(conn,
tableName);
+
+ histogramKey = getHistogramKey(url);
+ assertHTableThreadPoolUsed(htableThreadPoolHistograms,
histogramKey);
+ Map<String, String> expectedTagKeyValues =
getExpectedTagKeyValues(url,
+ cqsiNameService1);
+ assertHistogramTags(htableThreadPoolHistograms,
expectedTagKeyValues, histogramKey);
+ }
+
+ // Create a connection for "service2" connection profile
+ url = QueryUtil.getConnectionUrl(props, utility.getConfiguration(),
cqsiNameService2);
+ htableThreadPoolHistograms =
PhoenixRuntime.getHTableThreadPoolHistograms();
+ // Assert that HTableThreadPoolHistograms for service2 is not there yet
+
Assert.assertNull(htableThreadPoolHistograms.get(getHistogramKey(url)));
+ try (Connection conn = driver.connect(url, props)) {
+ htableThreadPoolHistograms = runQueryAndGetHistograms(conn,
tableName);
+
+ assertHTableThreadPoolNotUsed(htableThreadPoolHistograms,
histogramKey);
+ Map<String, String> expectedTagKeyValues =
getExpectedTagKeyValues(url,
+ cqsiNameService1);
+ assertHistogramTags(htableThreadPoolHistograms,
expectedTagKeyValues, histogramKey);
+
+ histogramKey = getHistogramKey(url);
+ // We have HTableThreadPoolHistograms for service1 and service2
CQSI instances
+ assertHTableThreadPoolUsed(htableThreadPoolHistograms,
histogramKey);
+ expectedTagKeyValues = getExpectedTagKeyValues(url,
+ cqsiNameService2);
+ assertHistogramTags(htableThreadPoolHistograms,
expectedTagKeyValues, histogramKey);
+ }
+ }
+
+ @Test
+ public void testCQSIThreadPoolHistogramsDisabled() throws Exception {
+ String tableName = generateUniqueName();
+ String cqsiName = "service1";
+ props.setProperty(CQSI_THREAD_POOL_METRICS_ENABLED, "false");
+ props.setProperty(CQSI_THREAD_POOL_ENABLED, "true");
+ String url = QueryUtil.getConnectionUrl(props,
utility.getConfiguration(), cqsiName);
+ try (Connection conn = driver.connect(url, props)) {
+ createTableAndUpsertData(conn, tableName);
+
+ Map<String, List<HistogramDistribution>>
htableThreadPoolHistograms =
+ runQueryAndGetHistograms(conn, tableName);
+ String histogramKey = getHistogramKey(url);
+ Assert.assertNull(htableThreadPoolHistograms.get(histogramKey));
+ }
+ }
+
+ @Test
+ public void testDefaultCQSIHistograms() throws Exception {
+ String tableName = generateUniqueName();
+
+ String url = QueryUtil.getConnectionUrl(props,
utility.getConfiguration());
+ Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+ try (Connection conn = driver.connect(url, props)) {
+ createTableAndUpsertData(conn, tableName);
+
+ htableThreadPoolHistograms = runQueryAndGetHistograms(conn,
tableName);
+
+ String histogramKey = getHistogramKey(url);
+ assertHTableThreadPoolUsed(htableThreadPoolHistograms,
histogramKey);
+ Map<String, String> expectedTagKeyValues =
getExpectedTagKeyValues(url,
+ DEFAULT_QUERY_SERVICES_NAME);
+ assertHistogramTags(htableThreadPoolHistograms,
expectedTagKeyValues, histogramKey);
+ }
+ }
+
+ @Test
+ public void testThreadPoolHistogramsSharedAcrossCQSIWithSameConnInfo()
throws Exception {
+ String tableName = generateUniqueName();
+ String histogramKey;
+ String cqsiName = "service1";
+
+ // Create a connection for "service1" connection profile
+ String url = QueryUtil.getConnectionUrl(props,
utility.getConfiguration(), cqsiName);
+ Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+ try (Connection conn = driver.connect(url, props)) {
+ createTableAndUpsertData(conn, tableName);
+
+ htableThreadPoolHistograms = runQueryAndGetHistograms(conn,
tableName);
+
+ histogramKey = getHistogramKey(url);
+ assertHTableThreadPoolUsed(htableThreadPoolHistograms,
histogramKey);
+ Map<String, String> expectedTagKeyValues =
getExpectedTagKeyValues(url, cqsiName);
+ assertHistogramTags(htableThreadPoolHistograms,
expectedTagKeyValues, histogramKey);
+ }
+
+ driver.cleanUpCQSICache();
+ try (Connection conn = driver.connect(url, props)) {
+ htableThreadPoolHistograms = runQueryAndGetHistograms(conn,
tableName);
+ // Assert that no new HTableThreadPoolHistograms instance was
created for a new CQSI
+ // instance
+ String histogramKeyForDefaultCQSI =
getHistogramKey(QueryUtil.getConnectionUrl(
+ new Properties(), utility.getConfiguration()));
+ Set<String> histogramKeySet =
+ new HashSet<>(Arrays.asList(histogramKeyForDefaultCQSI,
histogramKey));
+
Assert.assertTrue(histogramKeySet.containsAll(htableThreadPoolHistograms.keySet()));
+ assertHTableThreadPoolUsed(htableThreadPoolHistograms,
histogramKey);
+ Map<String, String> expectedTagKeyValues =
getExpectedTagKeyValues(url, cqsiName);
+ assertHistogramTags(htableThreadPoolHistograms,
expectedTagKeyValues, histogramKey);
+ }
+ }
+
+ private String getHistogramKey(String url) throws SQLException {
+ return ConnectionInfo.createNoLogin(url, null, null).toUrl();
+ }
+
+ private Map<String, String> getExpectedTagKeyValues(String url, String
cqsiName)
+ throws SQLException {
+ Map<String, String> expectedTagKeyValues = new HashMap<>();
+ ConnectionInfo connInfo = ConnectionInfo.createNoLogin(url, null,
null);
+ if (registryClassName.equals(ZKConnectionInfo.ZK_REGISTRY_NAME)) {
+
expectedTagKeyValues.put(HTableThreadPoolHistograms.Tag.servers.name(),
+ ((ZKConnectionInfo) connInfo).getZkHosts());
+ }
+ else {
+
expectedTagKeyValues.put(HTableThreadPoolHistograms.Tag.servers.name(),
+ ((AbstractRPCConnectionInfo)
connInfo).getBoostrapServers());
+ }
+
expectedTagKeyValues.put(HTableThreadPoolHistograms.Tag.cqsiName.name(),
cqsiName);
+ return expectedTagKeyValues;
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/ExternalHTableThreadPoolMetricsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/ExternalHTableThreadPoolMetricsIT.java
new file mode 100644
index 0000000000..8a043b5572
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/ExternalHTableThreadPoolMetricsIT.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.ZKConnectionInfo;
+import org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.query.HTableFactory;
+import org.apache.phoenix.query.QueryServices;
+import
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static
org.apache.phoenix.jdbc.ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
+
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class ExternalHTableThreadPoolMetricsIT extends
BaseHTableThreadPoolMetricsIT{
+ private static final int MAX_THREADS_IN_EXTERNAL_THREAD_POOL = 10;
+ private static final int QUEUE_CAPACITY_OF_EXTERNAL_THREAD_POOL = 100;
+ private static final String TAG_NAME = "cluster";
+ private static final Map<String, String> tagValues = new HashMap<>();
+ private static final String THREAD_POOL_1A =
"external_thread_pool_1".toUpperCase();
+ private static final String THREAD_POOL_2A =
"external_thread_pool_2".toUpperCase();
+ private static final String HISTOGRAM_DISABLED_THREAD_POOL =
+ "histogram_disabled_thread_pool".toUpperCase();
+ private static final String NULL_SUPPLIER_THREAD_POOL =
"null_supplier_thread_pool".toUpperCase();
+ private static final String NO_TAGS_THREAD_POOL =
"no_tags_thread_pool".toUpperCase();
+
+ private final String registryClassName;
+ private final Properties props = new Properties();
+
+ public ExternalHTableThreadPoolMetricsIT(String registryClassName) {
+ this.registryClassName = registryClassName;
+ }
+
+ private static ThreadPoolExecutor createThreadPoolExecutor(String
threadPoolName) {
+ Supplier<HTableThreadPoolHistograms> supplier =
+ getHTableThreadPoolHistogramsSupplier(threadPoolName);
+ BlockingQueue<Runnable> workQueue =
+ new
LinkedBlockingQueue<>(QUEUE_CAPACITY_OF_EXTERNAL_THREAD_POOL);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(threadPoolName + "-shared-pool-%d")
+ .setUncaughtExceptionHandler(
+ Threads.LOGGING_EXCEPTION_HANDLER)
+ .build();
+ return new HTableThreadPoolWithUtilizationStats(
+ MAX_THREADS_IN_EXTERNAL_THREAD_POOL,
MAX_THREADS_IN_EXTERNAL_THREAD_POOL,
+ 30, TimeUnit.SECONDS, workQueue, threadFactory,
+ threadPoolName, supplier) {
+ @Override
+ public void execute(Runnable command) {
+ super.execute(command);
+ }
+ };
+ }
+
+ private static Supplier<HTableThreadPoolHistograms>
getHTableThreadPoolHistogramsSupplier(
+ String threadPoolName) {
+ Supplier<HTableThreadPoolHistograms> supplier;
+ if (threadPoolName.equals(HISTOGRAM_DISABLED_THREAD_POOL)) {
+ supplier = null;
+ }
+ else if (threadPoolName.equals(NULL_SUPPLIER_THREAD_POOL)) {
+ supplier = new Supplier<HTableThreadPoolHistograms>() {
+ @Override
+ public HTableThreadPoolHistograms get() {
+ return null;
+ }
+ };
+ }
+ else if (threadPoolName.equals(NO_TAGS_THREAD_POOL)) {
+ supplier = new Supplier<HTableThreadPoolHistograms>() {
+ @Override
+ public HTableThreadPoolHistograms get() {
+ return new
HTableThreadPoolHistograms(MAX_THREADS_IN_EXTERNAL_THREAD_POOL,
+ QUEUE_CAPACITY_OF_EXTERNAL_THREAD_POOL);
+ }
+ };
+ }
+ else {
+ supplier = new Supplier<HTableThreadPoolHistograms>() {
+ @Override
+ public HTableThreadPoolHistograms get() {
+ HTableThreadPoolHistograms hTableThreadPoolHistograms =
+ new
HTableThreadPoolHistograms(MAX_THREADS_IN_EXTERNAL_THREAD_POOL,
+ QUEUE_CAPACITY_OF_EXTERNAL_THREAD_POOL);
+ hTableThreadPoolHistograms.addTag(TAG_NAME,
tagValues.get(threadPoolName));
+ return hTableThreadPoolHistograms;
+ }
+ };
+ }
+ return supplier;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ tagValues.put(THREAD_POOL_1A, "hbase1a");
+ tagValues.put(THREAD_POOL_2A, "hbase2a");
+
+ final Configuration conf = HBaseConfiguration.create();
+ setUpConfigForMiniCluster(conf);
+
+ InstanceResolver.clearSingletons();
+ // Override to get required config for static fields loaded that
require HBase config
+ InstanceResolver.getSingleton(ConfigurationFactory.class, new
ConfigurationFactory() {
+
+ @Override public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override public Configuration getConfiguration(Configuration
confToClone) {
+ Configuration copy = new Configuration(conf);
+ copy.addResource(confToClone);
+ return copy;
+ }
+ });
+ ThreadPoolExecutor executorFor1a =
createThreadPoolExecutor(THREAD_POOL_1A);
+ ThreadPoolExecutor executorFor2a =
createThreadPoolExecutor(THREAD_POOL_2A);
+ ThreadPoolExecutor histogramDisabledExecutor =
+ createThreadPoolExecutor(HISTOGRAM_DISABLED_THREAD_POOL);
+ ThreadPoolExecutor nullSupplierExecutor =
+ createThreadPoolExecutor(NULL_SUPPLIER_THREAD_POOL);
+ ThreadPoolExecutor defaultExecutor =
createThreadPoolExecutor(NO_TAGS_THREAD_POOL);
+ InstanceResolver.getSingleton(HTableFactory.class, new
HTableFactory.HTableFactoryImpl() {
+ @Override
+ public Table getTable(byte[] tableName,
+ org.apache.hadoop.hbase.client.Connection
connection,
+ ExecutorService pool) throws IOException {
+ if
(Bytes.toString(tableName).startsWith(HISTOGRAM_DISABLED_THREAD_POOL)) {
+ return super.getTable(tableName, connection,
histogramDisabledExecutor);
+ }
+ else if
(Bytes.toString(tableName).startsWith(NULL_SUPPLIER_THREAD_POOL)) {
+ return super.getTable(tableName, connection,
nullSupplierExecutor);
+ }
+ else if (Bytes.toString(tableName).startsWith(THREAD_POOL_1A))
{
+ return super.getTable(tableName, connection,
executorFor1a);
+ }
+ else if (Bytes.toString(tableName).startsWith(THREAD_POOL_2A))
{
+ return super.getTable(tableName, connection,
executorFor2a);
+ }
+ else {
+ return super.getTable(tableName, connection,
defaultExecutor);
+ }
+ }
+ });
+
+ Map<String, String> props = new HashMap<>();
+ props.put(QueryServices.TESTS_MINI_CLUSTER_NUM_MASTERS, "2");
+ setUpTestDriver(new ReadOnlyProps(props));
+ }
+
+ @Before
+ public void testCaseSetup() {
+ props.setProperty(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
registryClassName);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ HTableThreadPoolMetricsManager.clearHTableThreadPoolHistograms();
+ props.clear();
+ }
+
+ @Parameterized.Parameters(name =
"ExternalHTableThreadPoolMetricsIT_registryClassName={0}")
+ public synchronized static Collection<String> data() {
+ return Arrays.asList(ZKConnectionInfo.ZK_REGISTRY_NAME,
+ "org.apache.hadoop.hbase.client.RpcConnectionRegistry",
+ "org.apache.hadoop.hbase.client.MasterRegistry");
+ }
+
+ @Test
+ public void testHistogramsPerHTableThreadPool() throws Exception {
+ String tableName = THREAD_POOL_1A + "." + generateUniqueName();
+
+ Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+ String url = QueryUtil.getConnectionUrl(props,
utility.getConfiguration());
+
+ // Send traffic to HTable thread pool for hbase1a
+ try (Connection conn = driver.connect(url, props)) {
+ createTableAndUpsertData(conn, tableName);
+
+ htableThreadPoolHistograms = runQueryAndGetHistograms(conn,
tableName);
+ assertHTableThreadPoolUsed(htableThreadPoolHistograms,
THREAD_POOL_1A);
+ Assert.assertNull(htableThreadPoolHistograms.get(THREAD_POOL_2A));
+
+ Map<String, String> expectedTagKeyValues = new HashMap<>();
+ expectedTagKeyValues.put(TAG_NAME, tagValues.get(THREAD_POOL_1A));
+ assertHistogramTags(htableThreadPoolHistograms,
expectedTagKeyValues, THREAD_POOL_1A);
+ }
+
+ // Send traffic to HTable thread pool for hbase2a
+ tableName = THREAD_POOL_2A + "." + generateUniqueName();
+ try (Connection conn = driver.connect(url, props)) {
+ createTableAndUpsertData(conn, tableName);
+
+ htableThreadPoolHistograms = runQueryAndGetHistograms(conn,
tableName);
+
+ // We will have a HTable thread pool for hbase1a also
+ assertHTableThreadPoolUsed(htableThreadPoolHistograms,
THREAD_POOL_2A);
+ assertHTableThreadPoolNotUsed(htableThreadPoolHistograms,
THREAD_POOL_1A);
+
+ Map<String, String> expectedTagKeyValues = new HashMap<>();
+ expectedTagKeyValues.put(TAG_NAME, tagValues.get(THREAD_POOL_1A));
+ assertHistogramTags(htableThreadPoolHistograms,
expectedTagKeyValues, THREAD_POOL_1A);
+
+ expectedTagKeyValues.put(TAG_NAME, tagValues.get(THREAD_POOL_2A));
+ assertHistogramTags(htableThreadPoolHistograms,
expectedTagKeyValues, THREAD_POOL_2A);
+ }
+ }
+
+ @Test
+ public void testHistogramDisabled() throws Exception {
+ String tableName = HISTOGRAM_DISABLED_THREAD_POOL + "." +
generateUniqueName();
+
+ Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+ String url = QueryUtil.getConnectionUrl(props,
utility.getConfiguration());
+
+ try (Connection conn = driver.connect(url, props)) {
+ createTableAndUpsertData(conn, tableName);
+ htableThreadPoolHistograms = runQueryAndGetHistograms(conn,
tableName);
+
Assert.assertNull(htableThreadPoolHistograms.get(HISTOGRAM_DISABLED_THREAD_POOL));
+ }
+ }
+
+ @Test
+ public void testNullHistogramSupplier() throws Exception {
+ String tableName = NULL_SUPPLIER_THREAD_POOL + "." +
generateUniqueName();
+
+ Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+ String url = QueryUtil.getConnectionUrl(props,
utility.getConfiguration());
+
+ try (Connection conn = driver.connect(url, props)) {
+ createTableAndUpsertData(conn, tableName);
+ htableThreadPoolHistograms = runQueryAndGetHistograms(conn,
tableName);
+
Assert.assertNull(htableThreadPoolHistograms.get(NULL_SUPPLIER_THREAD_POOL));
+ }
+ }
+
+ @Test
+ public void testHistogramsWithoutTags() throws Exception {
+ String tableName = generateUniqueName();
+ Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+
+ String url = QueryUtil.getConnectionUrl(props,
utility.getConfiguration());
+ try (Connection conn = driver.connect(url, props)) {
+ createTableAndUpsertData(conn, tableName);
+ htableThreadPoolHistograms = runQueryAndGetHistograms(conn,
tableName);
+ assertHTableThreadPoolUsed(htableThreadPoolHistograms,
NO_TAGS_THREAD_POOL);
+ assertHistogramTags(htableThreadPoolHistograms, new HashMap<>(),
NO_TAGS_THREAD_POOL);
+ }
+ }
+}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
index 368a9c52a4..552f0d41e1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
@@ -107,7 +107,14 @@ public class PhoenixTestDriver extends
PhoenixEmbeddedDriver {
connectionQueryServicesMap.put(connInfo, connectionQueryServices);
return connectionQueryServices;
}
-
+
+ public synchronized void cleanUpCQSICache() throws SQLException {
+ for (ConnectionQueryServices service :
connectionQueryServicesMap.values()) {
+ service.close();
+ }
+ connectionQueryServicesMap.clear();
+ }
+
private synchronized void checkClosed() {
if (closed) {
throw new IllegalStateException("The Phoenix jdbc test driver has
been closed.");
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index bc011d05b6..39714454bc 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -552,8 +553,14 @@ public abstract class BaseTest {
utility = new HBaseTestingUtility(conf);
try {
long startTime = System.currentTimeMillis();
- utility.startMiniCluster(overrideProps.getInt(
- QueryServices.TESTS_MINI_CLUSTER_NUM_REGION_SERVERS,
NUM_SLAVES_BASE));
+ StartMiniClusterOption.Builder builder =
StartMiniClusterOption.builder();
+
builder.numMasters(overrideProps.getInt(QueryServices.TESTS_MINI_CLUSTER_NUM_MASTERS,
+ 1));
+ int numSlaves = overrideProps.getInt(
+ QueryServices.TESTS_MINI_CLUSTER_NUM_REGION_SERVERS,
NUM_SLAVES_BASE);
+ builder.numRegionServers(numSlaves);
+ builder.numDataNodes(numSlaves);
+ utility.startMiniCluster(builder.build());
long startupTime = System.currentTimeMillis()-startTime;
LOGGER.info("HBase minicluster startup complete in {} ms",
startupTime);
return getLocalClusterUrl(utility);