This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch offload-web-response-from-metadata-thread in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f52e5647724a792fd570f3f21473b5860cc089e0 Author: mattisonchao <[email protected]> AuthorDate: Thu Mar 5 00:47:06 2026 +0800 [improve][broker] Offload web response from metadata thread for list tenants/namespaces/clusters The list tenants, list namespaces, and list clusters REST endpoints currently execute asyncResponse.resume() on the metadata store thread. This can block the metadata thread pool, leading to deadlocks or performance degradation. Offload the response handling (sorting, filtering, collecting) and asyncResponse.resume() to the web service executor thread pool by using thenAcceptAsync/thenApplyAsync with the web executor. - TenantsBase.getTenants(): thenAccept -> thenAcceptAsync - ClustersBase.getClusters(): thenApply+thenAccept -> thenAcceptAsync - Namespaces.getTenantNamespaces(): thenAccept -> thenAcceptAsync - WebService: expose webServiceExecutor via @Getter Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java | 5 ++--- .../main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java | 4 ++-- .../src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java | 2 +- .../src/main/java/org/apache/pulsar/broker/web/WebService.java | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 0ae18fc2e54..b79b857c5d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -93,11 +93,10 @@ public class ClustersBase extends AdminResource { }) public void getClusters(@Suspended AsyncResponse asyncResponse) { clusterResources().listAsync() - .thenApply(clusters -> clusters.stream() + .thenAcceptAsync(clusters -> asyncResponse.resume(clusters.stream() // Remove "global" cluster from returned list .filter(cluster -> !Constants.GLOBAL_CLUSTER.equals(cluster)) - .collect(Collectors.toSet())) - .thenAccept(asyncResponse::resume) + .collect(Collectors.toSet())), pulsar().getWebService().getWebServiceExecutor()) .exceptionally(ex -> { log.error("[{}] Failed to get clusters {}", clientAppId(), ex); resumeAsyncResponseExceptionally(asyncResponse, ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java index ca16259c793..3c6a9ccd644 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java @@ -65,12 +65,12 @@ public class TenantsBase extends PulsarWebResource { final String clientAppId = clientAppId(); validateBothSuperUserAndTenantOperation(null, TenantOperation.LIST_TENANTS) .thenCompose(__ -> tenantResources().listTenantsAsync()) - .thenAccept(tenants -> { + .thenAcceptAsync(tenants -> { // deep copy the tenants to avoid concurrent sort exception List<String> deepCopy = new ArrayList<>(tenants); deepCopy.sort(null); asyncResponse.resume(deepCopy); - }).exceptionally(ex -> { + }, pulsar().getWebService().getWebServiceExecutor()).exceptionally(ex -> { log.error("[{}] Failed to get tenants list", clientAppId, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 90f4b087bfe..97886ae99eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -106,7 +106,7 @@ public class Namespaces extends NamespacesBase { public void getTenantNamespaces(@Suspended final AsyncResponse response, @PathParam("tenant") String tenant) { internalGetTenantNamespaces(tenant) - .thenAccept(response::resume) + .thenAcceptAsync(response::resume, pulsar().getWebService().getWebServiceExecutor()) .exceptionally(ex -> { log.error("[{}] Failed to get namespaces list: {}", clientAppId(), ex); resumeAsyncResponseExceptionally(response, ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 21c99f8196f..3e1250e634e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -94,8 +94,8 @@ public class WebService implements AutoCloseable { @Deprecated private final WebExecutorStats executorStats; private final WebExecutorThreadPoolStats webExecutorThreadPoolStats; + @Getter private final WebExecutorThreadPool webServiceExecutor; - private final ServerConnector httpConnector; private final ServerConnector httpsConnector; private final FilterInitializer filterInitializer;
