This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-21512 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 5b1093ea43875e14cff1b4490fcc7f846b765cdb Author: zhangduo <[email protected]> AuthorDate: Fri Jun 7 20:47:43 2019 +0800 HBASE-22550 Do not use Threads.newDaemonThreadFactory in ConnectionUtils.getThreadPool --- .../client/ConnectionOverAsyncConnection.java | 35 ++++++++++++++++++++-- .../hadoop/hbase/client/ConnectionUtils.java | 27 ----------------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java index 861aab0..e66733d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.log.HBaseMarkers; @@ -30,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * The connection implementation based on {@link AsyncConnection}. @@ -41,6 +46,10 @@ class ConnectionOverAsyncConnection implements Connection { private volatile boolean aborted = false; + // only used for executing coprocessor calls, as users may reference the methods in the + // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... + // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin + // interface. private volatile ExecutorService batchPool = null; private final AsyncConnectionImpl conn; @@ -134,13 +143,33 @@ class ConnectionOverAsyncConnection implements Connection { return conn.isClosed(); } + // only used for executing coprocessor calls, as users may reference the methods in the + // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... + // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin + // interface. + private ThreadPoolExecutor createThreadPool() { + Configuration conf = conn.getConfiguration(); + int threads = conf.getInt("hbase.hconnection.threads.max", 256); + long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); + BlockingQueue<Runnable> workQueue = + new LinkedBlockingQueue<>(threads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); + ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads, keepAliveTime, + TimeUnit.SECONDS, workQueue, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + "-shared-%d").build()); + tpe.allowCoreThreadTimeOut(true); + return tpe; + } + + // only used for executing coprocessor calls, as users may reference the methods in the + // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread... + // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin + // interface. private ExecutorService getBatchPool() { if (batchPool == null) { synchronized (this) { if (batchPool == null) { - int threads = conn.getConfiguration().getInt("hbase.hconnection.threads.max", 256); - this.batchPool = ConnectionUtils.getThreadPool(conn.getConfiguration(), threads, threads, - () -> toString() + "-shared", null); + this.batchPool = createThreadPool(); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 2ac3cd2..f1cf988 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -29,12 +29,9 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -54,7 +51,6 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; import org.apache.yetus.audience.InterfaceAudience; @@ -646,29 +642,6 @@ public final class ConnectionUtils { return future; } - static ThreadPoolExecutor getThreadPool(Configuration conf, int maxThreads, int coreThreads, - Supplier<String> threadName, BlockingQueue<Runnable> passedWorkQueue) { - // shared HTable thread executor not yet initialized - if (maxThreads == 0) { - maxThreads = Runtime.getRuntime().availableProcessors() * 8; - } - if (coreThreads == 0) { - coreThreads = Runtime.getRuntime().availableProcessors() * 8; - } - long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); - BlockingQueue<Runnable> workQueue = passedWorkQueue; - if (workQueue == null) { - workQueue = - new LinkedBlockingQueue<>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, - HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); - coreThreads = maxThreads; - } - ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, - TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(threadName.get())); - tpe.allowCoreThreadTimeOut(true); - return tpe; - } - static void shutdownPool(ExecutorService pool) { pool.shutdown(); try {
