This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 5b1093ea43875e14cff1b4490fcc7f846b765cdb
Author: zhangduo <[email protected]>
AuthorDate: Fri Jun 7 20:47:43 2019 +0800

    HBASE-22550 Do not use Threads.newDaemonThreadFactory in 
ConnectionUtils.getThreadPool
---
 .../client/ConnectionOverAsyncConnection.java      | 35 ++++++++++++++++++++--
 .../hadoop/hbase/client/ConnectionUtils.java       | 27 -----------------
 2 files changed, 32 insertions(+), 30 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
index 861aab0..e66733d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
@@ -18,9 +18,13 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
@@ -30,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * The connection implementation based on {@link AsyncConnection}.
@@ -41,6 +46,10 @@ class ConnectionOverAsyncConnection implements Connection {
 
   private volatile boolean aborted = false;
 
+  // only used for executing coprocessor calls, as users may reference the 
methods in the
+  // BlockingInterface of the protobuf stub so we have to execute the call in 
a separated thread...
+  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in 
Table and Admin
+  // interface.
   private volatile ExecutorService batchPool = null;
 
   private final AsyncConnectionImpl conn;
@@ -134,13 +143,33 @@ class ConnectionOverAsyncConnection implements Connection 
{
     return conn.isClosed();
   }
 
+  // only used for executing coprocessor calls, as users may reference the 
methods in the
+  // BlockingInterface of the protobuf stub so we have to execute the call in 
a separated thread...
+  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in 
Table and Admin
+  // interface.
+  private ThreadPoolExecutor createThreadPool() {
+    Configuration conf = conn.getConfiguration();
+    int threads = conf.getInt("hbase.hconnection.threads.max", 256);
+    long keepAliveTime = 
conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
+    BlockingQueue<Runnable> workQueue =
+      new LinkedBlockingQueue<>(threads * 
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+        HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
+    ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads, 
keepAliveTime,
+      TimeUnit.SECONDS, workQueue,
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + 
"-shared-%d").build());
+    tpe.allowCoreThreadTimeOut(true);
+    return tpe;
+  }
+
+  // only used for executing coprocessor calls, as users may reference the 
methods in the
+  // BlockingInterface of the protobuf stub so we have to execute the call in 
a separated thread...
+  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in 
Table and Admin
+  // interface.
   private ExecutorService getBatchPool() {
     if (batchPool == null) {
       synchronized (this) {
         if (batchPool == null) {
-          int threads = 
conn.getConfiguration().getInt("hbase.hconnection.threads.max", 256);
-          this.batchPool = 
ConnectionUtils.getThreadPool(conn.getConfiguration(), threads, threads,
-            () -> toString() + "-shared", null);
+          this.batchPool = createThreadPool();
         }
       }
     }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 2ac3cd2..f1cf988 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -29,12 +29,9 @@ import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -54,7 +51,6 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -646,29 +642,6 @@ public final class ConnectionUtils {
     return future;
   }
 
-  static ThreadPoolExecutor getThreadPool(Configuration conf, int maxThreads, 
int coreThreads,
-      Supplier<String> threadName, BlockingQueue<Runnable> passedWorkQueue) {
-    // shared HTable thread executor not yet initialized
-    if (maxThreads == 0) {
-      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
-    }
-    if (coreThreads == 0) {
-      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
-    }
-    long keepAliveTime = 
conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
-    BlockingQueue<Runnable> workQueue = passedWorkQueue;
-    if (workQueue == null) {
-      workQueue =
-        new LinkedBlockingQueue<>(maxThreads * 
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
-          HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
-      coreThreads = maxThreads;
-    }
-    ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, 
keepAliveTime,
-      TimeUnit.SECONDS, workQueue, 
Threads.newDaemonThreadFactory(threadName.get()));
-    tpe.allowCoreThreadTimeOut(true);
-    return tpe;
-  }
-
   static void shutdownPool(ExecutorService pool) {
     pool.shutdown();
     try {

Reply via email to