[ 
https://issues.apache.org/jira/browse/HDFS-17651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913989#comment-17913989
 ] 

ASF GitHub Bot commented on HDFS-17651:
---------------------------------------

Hexiaoqiao commented on code in PR #7244:
URL: https://github.com/apache/hadoop/pull/7244#discussion_r1919698079


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml:
##########
@@ -49,13 +49,39 @@
   </property>
 
   <property>
-    <name>dfs.federation.router.rpc.async.enable</name>
+    <name>dfs.federation.router.async.rpc.enable</name>
     <value>false</value>
     <description>
       If true, router will process the RPC request asynchronously.
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.async.rpc.ns.handler.count</name>
+    <value>nsPlaceholder1:0,nsPlaceholder2:0</value>

Review Comment:
   suggest to leave empty here because it is not valid value. Thanks.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java:
##########
@@ -72,15 +72,23 @@ public class RBFConfigKeys extends 
CommonConfigurationKeysPublic {
   public static final String DFS_ROUTER_RPC_ENABLE =
       FEDERATION_ROUTER_PREFIX + "rpc.enable";
   public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
-  public static final String DFS_ROUTER_RPC_ENABLE_ASYNC =
-      FEDERATION_ROUTER_PREFIX + "rpc.async.enable";
-  public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false;
-  public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT =
-      FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count";
-  public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2;
-  public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT =
-      FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count";
-  public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10;
+  // HDFS Router Asynchronous RPC
+  public static final String DFS_ROUTER_ASYNC_RPC_ENABLE_KEY =
+      FEDERATION_ROUTER_PREFIX + "async.rpc.enable";
+  public static final boolean DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT = false;
+  public static final String FEDERATION_ROUTER_ASYNC_RPC_PREFIX =
+          FEDERATION_ROUTER_PREFIX + "async.rpc.";
+  // Example: ns1:count1,ns2:count2,ns3:count3
+  public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY =
+          FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "ns.handler.count";
+  public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT =

Review Comment:
   Leave empty too as above comment.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -491,23 +500,78 @@ public RouterRpcServer(Configuration conf, Router router,
 
   /**
    * Init router async handlers and router async responders.
+   * @param configuration the configuration.
    */
-  public void initAsyncThreadPool() {
-    int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
-    int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
-    if (asyncRouterHandler == null) {
-      LOG.info("init router async handler count: {}", asyncHandlerCount);
-      asyncRouterHandler = Executors.newFixedThreadPool(
-          asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+  public void initAsyncThreadPools(Configuration configuration) {
+    LOG.info("Begin initialize asynchronous handler and responder thread 
pool.");
+    initNsAsyncHandlerCount();
+    Set<String> allConfiguredNS = 
FederationUtil.getAllConfiguredNS(configuration);
+    Set<String> unassignedNS = new HashSet<>();
+    allConfiguredNS.add(CONCURRENT_NS);
+
+    for (String nsId : allConfiguredNS) {
+      int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
+      LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+      if (dedicatedHandlers > 0) {
+        initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
+        LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, 
nsId);
+      } else {
+        unassignedNS.add(nsId);
+      }
+    }
+
+    int asyncHandlerCountDefault = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+
+    if (!unassignedNS.isEmpty()) {
+      LOG.warn("Async handler unassigned ns: {}", unassignedNS);
+      LOG.info("Use default async handler count {} for unassigned ns.", 
asyncHandlerCountDefault);
+      for (String nsId : unassignedNS) {
+        initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+      }
     }
-    if (asyncRouterResponder == null) {
-      LOG.info("init router async responder count: {}", asyncResponderCount);
-      asyncRouterResponder = Executors.newFixedThreadPool(
-          asyncResponderCount, new AsyncThreadFactory("router async responder 
"));
+
+    int asyncResponderCount = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
+        DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
+    if (routerAsyncResponderExecutor == null) {
+      LOG.info("Initialize router async responder count: {}", 
asyncResponderCount);
+      routerAsyncResponderExecutor = Executors.newFixedThreadPool(
+          asyncResponderCount, new AsyncThreadFactory("Router Async Responder 
#"));
+    }
+    
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
+
+    if (routerDefaultAsyncHandlerExecutor == null) {
+      LOG.info("init router async default executor handler count: {}", 
asyncHandlerCountDefault);
+      routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
+          asyncHandlerCountDefault, new AsyncThreadFactory("Router Async 
Default Handler #"));
+    }
+  }
+
+  private void initNsAsyncHandlerCount() {
+    String configNsHandler = 
conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);

Review Comment:
   I don't prefer the format of this value, but no better idea now. ^_^



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -491,23 +500,78 @@ public RouterRpcServer(Configuration conf, Router router,
 
   /**
    * Init router async handlers and router async responders.
+   * @param configuration the configuration.
    */
-  public void initAsyncThreadPool() {
-    int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
-    int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
-    if (asyncRouterHandler == null) {
-      LOG.info("init router async handler count: {}", asyncHandlerCount);
-      asyncRouterHandler = Executors.newFixedThreadPool(
-          asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+  public void initAsyncThreadPools(Configuration configuration) {
+    LOG.info("Begin initialize asynchronous handler and responder thread 
pool.");
+    initNsAsyncHandlerCount();
+    Set<String> allConfiguredNS = 
FederationUtil.getAllConfiguredNS(configuration);
+    Set<String> unassignedNS = new HashSet<>();
+    allConfiguredNS.add(CONCURRENT_NS);
+
+    for (String nsId : allConfiguredNS) {
+      int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
+      LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+      if (dedicatedHandlers > 0) {
+        initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
+        LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, 
nsId);
+      } else {
+        unassignedNS.add(nsId);
+      }
+    }
+
+    int asyncHandlerCountDefault = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+
+    if (!unassignedNS.isEmpty()) {
+      LOG.warn("Async handler unassigned ns: {}", unassignedNS);
+      LOG.info("Use default async handler count {} for unassigned ns.", 
asyncHandlerCountDefault);
+      for (String nsId : unassignedNS) {
+        initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+      }
     }
-    if (asyncRouterResponder == null) {
-      LOG.info("init router async responder count: {}", asyncResponderCount);
-      asyncRouterResponder = Executors.newFixedThreadPool(
-          asyncResponderCount, new AsyncThreadFactory("router async responder 
"));
+
+    int asyncResponderCount = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
+        DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
+    if (routerAsyncResponderExecutor == null) {
+      LOG.info("Initialize router async responder count: {}", 
asyncResponderCount);
+      routerAsyncResponderExecutor = Executors.newFixedThreadPool(
+          asyncResponderCount, new AsyncThreadFactory("Router Async Responder 
#"));
+    }
+    
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
+
+    if (routerDefaultAsyncHandlerExecutor == null) {
+      LOG.info("init router async default executor handler count: {}", 
asyncHandlerCountDefault);
+      routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
+          asyncHandlerCountDefault, new AsyncThreadFactory("Router Async 
Default Handler #"));
+    }
+  }
+
+  private void initNsAsyncHandlerCount() {
+    String configNsHandler = 
conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
+    if (StringUtils.isEmpty(configNsHandler)) {
+      LOG.error(
+          "The config key: {} is incorrect! The value is empty.",
+          DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
+      configNsHandler = DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;
     }
-    AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder);
+    String[] nsHandlers = configNsHandler.split(",");
+    for (String nsHandlerInfo : nsHandlers) {
+      String[] nsHandlerItems = nsHandlerInfo.split(":");
+      if (nsHandlerItems.length != 2 || StringUtils.isBlank(nsHandlerItems[0]) 
||
+          !StringUtils.isNumeric(nsHandlerItems[1])) {
+        LOG.error("The config key: {} is incorrect! The value is {}.",
+            DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, nsHandlerInfo);
+        continue;
+      }
+      nsAsyncHandlerCount.put(nsHandlerItems[0], 
Integer.parseInt(nsHandlerItems[1]));

Review Comment:
   Will it involve some unexpected action if `nsHandlerItems[0]` is not valid 
namespace name? Is it better to check first?



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -491,23 +500,78 @@ public RouterRpcServer(Configuration conf, Router router,
 
   /**
    * Init router async handlers and router async responders.
+   * @param configuration the configuration.
    */
-  public void initAsyncThreadPool() {
-    int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
-    int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
-        DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
-    if (asyncRouterHandler == null) {
-      LOG.info("init router async handler count: {}", asyncHandlerCount);
-      asyncRouterHandler = Executors.newFixedThreadPool(
-          asyncHandlerCount, new AsyncThreadFactory("router async handler "));
+  public void initAsyncThreadPools(Configuration configuration) {
+    LOG.info("Begin initialize asynchronous handler and responder thread 
pool.");
+    initNsAsyncHandlerCount();
+    Set<String> allConfiguredNS = 
FederationUtil.getAllConfiguredNS(configuration);
+    Set<String> unassignedNS = new HashSet<>();
+    allConfiguredNS.add(CONCURRENT_NS);
+
+    for (String nsId : allConfiguredNS) {
+      int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
+      LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+      if (dedicatedHandlers > 0) {
+        initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
+        LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, 
nsId);
+      } else {
+        unassignedNS.add(nsId);
+      }
+    }
+
+    int asyncHandlerCountDefault = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+
+    if (!unassignedNS.isEmpty()) {
+      LOG.warn("Async handler unassigned ns: {}", unassignedNS);
+      LOG.info("Use default async handler count {} for unassigned ns.", 
asyncHandlerCountDefault);
+      for (String nsId : unassignedNS) {
+        initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
+      }
     }
-    if (asyncRouterResponder == null) {
-      LOG.info("init router async responder count: {}", asyncResponderCount);
-      asyncRouterResponder = Executors.newFixedThreadPool(
-          asyncResponderCount, new AsyncThreadFactory("router async responder 
"));
+
+    int asyncResponderCount = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
+        DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
+    if (routerAsyncResponderExecutor == null) {
+      LOG.info("Initialize router async responder count: {}", 
asyncResponderCount);
+      routerAsyncResponderExecutor = Executors.newFixedThreadPool(
+          asyncResponderCount, new AsyncThreadFactory("Router Async Responder 
#"));
+    }
+    
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
+
+    if (routerDefaultAsyncHandlerExecutor == null) {
+      LOG.info("init router async default executor handler count: {}", 
asyncHandlerCountDefault);
+      routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
+          asyncHandlerCountDefault, new AsyncThreadFactory("Router Async 
Default Handler #"));
+    }
+  }
+
+  private void initNsAsyncHandlerCount() {
+    String configNsHandler = 
conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
+    if (StringUtils.isEmpty(configNsHandler)) {
+      LOG.error(
+          "The config key: {} is incorrect! The value is empty.",
+          DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
+      configNsHandler = DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;

Review Comment:
   The default value 'nsPlaceholder1:0,nsPlaceholder2:0' is invalid, we should 
avoid to use it in production cluster. Suggest leave empty for this default 
value, and give all `ns` one static value, such as 10 as the current implement.





> [ARR] Async handler executor isolation.
> ---------------------------------------
>
>                 Key: HDFS-17651
>                 URL: https://issues.apache.org/jira/browse/HDFS-17651
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>            Reporter: farmmamba
>            Assignee: farmmamba
>            Priority: Major
>              Labels: pull-request-available
>
> The main purpose of this PR is to isolate each nameservice by letting each 
> nameservice has its own async handler thread pool.
> Think below situation:
> When a downstream nameserivce ns1 has poor performance, the threads in async 
> handler thread pool will be occupied adding calls to the ns1's 
> Connection#calls. There will be no available threads to handle normal 
> nameservice's rpc request asynchronously. Therefore, it is better to isolate 
> the async handler thread pool of different nameservices.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to