KeeProMise commented on code in PR #8548:
URL: https://github.com/apache/hadoop/pull/8548#discussion_r3434343420
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java:
##########
@@ -510,44 +515,75 @@ public RouterRpcServer(Configuration conf, Router router,
public void initAsyncThreadPools(Configuration configuration) {
Set<String> allConfiguredNS =
FederationUtil.getAllConfiguredNS(configuration);
allConfiguredNS.add(CONCURRENT_NS);
- Map<String, Integer> nsAsyncActiveHandlerCount =
parseNsAsyncHandlerCount(configuration);
- initAsyncHandlerThreadPools(configuration, allConfiguredNS,
nsAsyncActiveHandlerCount);
+ Map<String, Integer> nsAsyncActiveHandlerCount =
parseNsAsyncHandlerCount(configuration,
+ DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
+ initAsyncHandlerThreadPools(configuration, allConfiguredNS, false,
+ DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT,
+ nsAsyncActiveHandlerCount, asyncRouterHandlerExecutors);
+
+ Map<String, Integer> nsAsyncObserverHandlerCount =
parseNsAsyncHandlerCount(configuration,
+ DFS_ROUTER_ASYNC_RPC_NS_OBSERVER_HANDLER_COUNT_KEY);
+ initAsyncHandlerThreadPools(configuration, allConfiguredNS, true,
+ DFS_ROUTER_ASYNC_RPC_OBSERVER_HANDLER_COUNT_KEY,
+ DFS_ROUTER_ASYNC_RPC_OBSERVER_HANDLER_COUNT_DEFAULT,
nsAsyncObserverHandlerCount,
+ asyncRouterOBHandlerExecutors);
+
initAsyncResponderThreadPools(configuration);
}
- private void initAsyncHandlerThreadPools(Configuration configuration,
- Set<String> allConfiguredNS, Map<String, Integer> nsAsyncHandlerCount) {
- LOG.info("Initializing asynchronous handler thread pools");
+ /**
+ * Initializes async handler executors for each configured namespace.
+ *
+ * @param configuration router configuration
+ * @param allConfiguredNS set of all configured namespace IDs
+ * @param useObserver true if these handlers serve observer namenodes
+ * @param handlerCountKey key for the default handler count
+ * @param handlerCountDefault default handler count
+ * @param nsAsyncHandlerCount per-namespace handler counts
+ * @param executors executor map to populate
+ */
+ private void initAsyncHandlerThreadPools(Configuration configuration,
Set<String> allConfiguredNS,
+ boolean useObserver, String handlerCountKey, int handlerCountDefault,
+ Map<String, Integer> nsAsyncHandlerCount, Map<String,
ThreadPoolExecutor> executors) {
+ String namenodeTypeForLogging = useObserver ? "Observer Namenode" :
"Active Namenode";
int asyncQueueSize = configuration.getInt(DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE,
DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT);
if (asyncQueueSize < 1) {
throw new IllegalArgumentException("Async queue size must be at least
1");
}
- int asyncHandlerCountDefault =
configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
- DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
+ int asyncHandlerCountDefault = configuration.getInt(handlerCountKey,
handlerCountDefault);
+ // Skip separate observer handlers and let them share with active if not
configured
+ if (asyncHandlerCountDefault < 1 && nsAsyncHandlerCount.isEmpty() &&
useObserver) {
+ LOG.info("Async observer handlers are not configured, skipping...");
+ return;
+ } else {
+ useSeparateAsyncRouterOBHandlerExecutors = true;
+ }
Review Comment:
In `initAsyncHandlerThreadPools`:
```java
if (asyncHandlerCountDefault < 1 && nsAsyncHandlerCount.isEmpty() &&
useObserver) {
return;
} else {
useSeparateAsyncRouterOBHandlerExecutors = true;
}
```
This method is called twice — first for active (useObserver=false), then
for observer (useObserver=true). On the first call, useObserver=false
guarantees the if condition is false, so the else branch
sets the flag to true unconditionally. By the time the second call runs,
the flag is already true regardless of whether observer handlers are
configured.
The flag effectively never works as a gate. It happens to be functionally
correct by accident (empty asyncRouterOBHandlerExecutors map falls back to
routerDefaultAsyncHandlerExecutor), but when observer
is not configured, getAsyncExecutorForNamespace(nsId, true) returns the
global default executor instead of the per-ns active executor from
asyncRouterHandlerExecutors — which seems unintended.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]