KeeProMise commented on code in PR #8548:
URL: https://github.com/apache/hadoop/pull/8548#discussion_r3434343420


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -510,44 +515,75 @@ public RouterRpcServer(Configuration conf, Router router,
   public void initAsyncThreadPools(Configuration configuration) {
     Set<String> allConfiguredNS = 
FederationUtil.getAllConfiguredNS(configuration);
     allConfiguredNS.add(CONCURRENT_NS);
-    Map<String, Integer> nsAsyncActiveHandlerCount = 
parseNsAsyncHandlerCount(configuration);
-    initAsyncHandlerThreadPools(configuration, allConfiguredNS, 
nsAsyncActiveHandlerCount);
+    Map<String, Integer> nsAsyncActiveHandlerCount = 
parseNsAsyncHandlerCount(configuration,
+        DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
+    initAsyncHandlerThreadPools(configuration, allConfiguredNS, false,
+        DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT,
+        nsAsyncActiveHandlerCount, asyncRouterHandlerExecutors);
+
+    Map<String, Integer> nsAsyncObserverHandlerCount = 
parseNsAsyncHandlerCount(configuration,
+        DFS_ROUTER_ASYNC_RPC_NS_OBSERVER_HANDLER_COUNT_KEY);
+    initAsyncHandlerThreadPools(configuration, allConfiguredNS, true,
+        DFS_ROUTER_ASYNC_RPC_OBSERVER_HANDLER_COUNT_KEY,
+        DFS_ROUTER_ASYNC_RPC_OBSERVER_HANDLER_COUNT_DEFAULT, 
nsAsyncObserverHandlerCount,
+        asyncRouterOBHandlerExecutors);
+
     initAsyncResponderThreadPools(configuration);
   }
 
-  private void initAsyncHandlerThreadPools(Configuration configuration,
-      Set<String> allConfiguredNS, Map<String, Integer> nsAsyncHandlerCount) {
-    LOG.info("Initializing asynchronous handler thread pools");
+  /**
+   * Initializes async handler executors for each configured namespace.
+   *
+   * @param configuration        router configuration
+   * @param allConfiguredNS      set of all configured namespace IDs
+   * @param useObserver         true if these handlers serve observer namenodes
+   * @param handlerCountKey     key for the default handler count
+   * @param handlerCountDefault default handler count
+   * @param nsAsyncHandlerCount per-namespace handler counts
+   * @param executors           executor map to populate
+   */
+  private void initAsyncHandlerThreadPools(Configuration configuration, 
Set<String> allConfiguredNS,
+      boolean useObserver, String handlerCountKey, int handlerCountDefault,
+      Map<String, Integer> nsAsyncHandlerCount, Map<String, 
ThreadPoolExecutor> executors) {
+    String namenodeTypeForLogging = useObserver ? "Observer Namenode" : 
"Active Namenode";
     int asyncQueueSize = configuration.getInt(DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE,
         DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT);
     if (asyncQueueSize < 1) {
       throw new IllegalArgumentException("Async queue size must be at least 
1");
     }
-    int asyncHandlerCountDefault = 
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
-        DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+    int asyncHandlerCountDefault = configuration.getInt(handlerCountKey, 
handlerCountDefault);
+    // Skip separate observer handlers and let them share with active if not 
configured
+    if (asyncHandlerCountDefault < 1 && nsAsyncHandlerCount.isEmpty() && 
useObserver) {
+      LOG.info("Async observer handlers are not configured, skipping...");
+      return;
+    } else {
+      useSeparateAsyncRouterOBHandlerExecutors = true;
+    }

Review Comment:
   In `initAsyncHandlerThreadPools`:                                            
                                                                                
                                             
      
     ```java                                                                    
                                                                                
                                               
     if (asyncHandlerCountDefault < 1 && nsAsyncHandlerCount.isEmpty() && 
useObserver) {                                                                  
                                                   
         return;                                                                
                                                                                
                                               
     } else {
         useSeparateAsyncRouterOBHandlerExecutors = true;                       
                                                                                
                                               
     }                                                                          
                                                                                
                                               
    ```                                                                         
                                                                                
                              
     This method is called twice — first for active (useObserver=false), then 
for observer (useObserver=true). On the first call, useObserver=false 
guarantees the if condition is false, so the else branch   
     sets the flag to true unconditionally. By the time the second call runs, 
the flag is already true regardless of whether observer handlers are 
configured.                                               
                                                                                
                                                                                
                                               
     The flag effectively never works as a gate. It happens to be functionally 
correct by accident (empty asyncRouterOBHandlerExecutors map falls back to 
routerDefaultAsyncHandlerExecutor), but when observer
      is not configured, getAsyncExecutorForNamespace(nsId, true) returns the 
global default executor instead of the per-ns active executor from 
asyncRouterHandlerExecutors — which seems unintended.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to