This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 441d335dcf0 Isolate potentially long-running minion task API resources 
on controller (#17494)
441d335dcf0 is described below

commit 441d335dcf04647c34353aec9213c8361f4009a0
Author: Jhow <[email protected]>
AuthorDate: Tue Feb 3 14:35:15 2026 -0800

    Isolate potentially long-running minion task API resources on controller 
(#17494)
---
 .../pinot/common/metrics/ControllerGauge.java      |   4 +-
 .../api/ControllerAdminApiApplication.java         |  46 +-
 .../pinot/controller/api/resources/Constants.java  |   2 +
 .../PinotSegmentUploadDownloadRestletResource.java |  61 +--
 .../api/resources/PinotTaskRestletResource.java    | 470 +++++++++++++++------
 .../resources/PinotTaskRestletResourceTest.java    |  54 ++-
 6 files changed, 464 insertions(+), 173 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 0e95f1915e5..2cb716a0af1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -221,7 +221,9 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   // The progress of a certain table rebalance job of a table
   TABLE_REBALANCE_JOB_PROGRESS_PERCENT("percent", false),
   // HTTP thread utilization
-  HTTP_THREAD_UTILIZATION("httpThreadUtilization", true);
+  HTTP_THREAD_UTILIZATION("httpThreadUtilization", true),
+  // Track the concurrent executions of the API resources that use 
@ManagedAsync
+  MANAGED_ASYNC_ACTIVE_THREADS("threads", true);
 
 
   private final String _gaugeName;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/ControllerAdminApiApplication.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/ControllerAdminApiApplication.java
index 74abcbf813d..dddaba1881d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/ControllerAdminApiApplication.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/ControllerAdminApiApplication.java
@@ -18,15 +18,20 @@
  */
 package org.apache.pinot.controller.api;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.swagger.jaxrs.listing.SwaggerSerializers;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.container.ContainerRequestContext;
 import javax.ws.rs.container.ContainerResponseContext;
 import javax.ws.rs.container.ContainerResponseFilter;
+import javax.ws.rs.ext.Provider;
 import org.apache.pinot.common.audit.AuditLogFilter;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -50,7 +55,9 @@ import org.glassfish.grizzly.threadpool.ThreadPoolProbe;
 import org.glassfish.hk2.utilities.binding.AbstractBinder;
 import org.glassfish.jersey.jackson.JacksonFeature;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
+import org.glassfish.jersey.server.ManagedAsyncExecutor;
 import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.spi.ExecutorServiceProvider;
 
 
 public class ControllerAdminApiApplication extends ResourceConfig {
@@ -62,6 +69,7 @@ public class ControllerAdminApiApplication extends 
ResourceConfig {
   private final boolean _useHttps;
   private final boolean _enableSwagger;
   private HttpServer _httpServer;
+  private final ThreadPoolExecutor _managedAsyncExecutor;
 
   public ControllerAdminApiApplication(ControllerConf conf) {
     super();
@@ -83,6 +91,8 @@ public class ControllerAdminApiApplication extends 
ResourceConfig {
     register(new CorsFilter());
     register(AuthenticationFilter.class);
     register(AuditLogFilter.class);
+    _managedAsyncExecutor = createManagedAsyncExecutor();
+    register(new ManagedAsyncExecutorServiceProvider(_managedAsyncExecutor));
     // property("jersey.config.server.tracing.type", "ALL");
     // property("jersey.config.server.tracing.threshold", "VERBOSE");
   }
@@ -117,6 +127,7 @@ public class ControllerAdminApiApplication extends 
ResourceConfig {
         .addHttpHandler(new CLStaticHttpHandler(classLoader, 
"/webapp/images/"), "/images/");
     _httpServer.getServerConfiguration().addHttpHandler(new 
CLStaticHttpHandler(classLoader, "/webapp/js/"), "/js/");
     registerHttpThreadUtilizationGauge(controllerMetrics);
+    registerManagedAsyncThreadGauges(controllerMetrics);
   }
 
   public void stop() {
@@ -124,6 +135,7 @@ public class ControllerAdminApiApplication extends 
ResourceConfig {
       return;
     }
     _httpServer.shutdownNow();
+    _managedAsyncExecutor.shutdown();
   }
 
   private class CorsFilter implements ContainerResponseFilter {
@@ -145,8 +157,8 @@ public class ControllerAdminApiApplication extends 
ResourceConfig {
   }
 
   /**
-    * Registers a gauge that tracks HTTP thread pool utilization without using 
reflection.
-    * Instead, it uses a custom ThreadPoolProbe to count active threads.
+   * Registers a gauge that tracks HTTP thread pool utilization without using 
reflection.
+   * Instead, it uses a custom ThreadPoolProbe to count active threads.
    */
   private void registerHttpThreadUtilizationGauge(ControllerMetrics metrics) {
     NetworkListener listener = _httpServer.getListeners().iterator().next();
@@ -170,6 +182,16 @@ public class ControllerAdminApiApplication extends 
ResourceConfig {
     });
   }
 
+  private void registerManagedAsyncThreadGauges(ControllerMetrics metrics) {
+    
metrics.setOrUpdateGauge(ControllerGauge.MANAGED_ASYNC_ACTIVE_THREADS.getGaugeName(),
+        () -> (long) _managedAsyncExecutor.getActiveCount());
+  }
+
+  private ThreadPoolExecutor createManagedAsyncExecutor() {
+    ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat("managed-async-%d").build();
+    return (ThreadPoolExecutor) Executors.newCachedThreadPool(threadFactory);
+  }
+
   /**
    * Custom probe to track busy threads in Grizzly thread pools without using 
reflection.
    */
@@ -193,4 +215,24 @@ public class ControllerAdminApiApplication extends 
ResourceConfig {
       return _active.get();
     }
   }
+
+  @Provider
+  @ManagedAsyncExecutor
+  private static final class ManagedAsyncExecutorServiceProvider implements 
ExecutorServiceProvider {
+    private final ExecutorService _executorService;
+
+    private ManagedAsyncExecutorServiceProvider(ExecutorService 
executorService) {
+      _executorService = executorService;
+    }
+
+    @Override
+    public ExecutorService getExecutorService() {
+      return _executorService;
+    }
+
+    @Override
+    public void dispose(ExecutorService executorService) {
+      // managed in ControllerAdminApiApplication.stop()
+    }
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index ef30166c5d8..77964ada533 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.api.resources;
 
+import javax.annotation.Nullable;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.StringUtils;
@@ -63,6 +64,7 @@ public class Constants {
   public static final String REALTIME_SEGMENT_VALIDATION_MANAGER = 
"RealtimeSegmentValidationManager";
   public static final String REALTIME_OFFSET_AUTO_RESET_MANAGER = 
"RealtimeOffsetAutoResetManager";
 
+  @Nullable
   public static TableType validateTableType(String tableTypeStr) {
     if (StringUtils.isBlank(tableTypeStr)) {
       return null;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index bb9c2151410..75ff7eb298f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -126,15 +126,18 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.DATABASE;
 import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
 
 
-@Api(tags = Constants.SEGMENT_TAG, authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY),
-    @Authorization(value = DATABASE)})
+@Api(tags = Constants.SEGMENT_TAG, authorizations = {
+    @Authorization(value = SWAGGER_AUTHORIZATION_KEY),
+    @Authorization(value = DATABASE)
+})
 @SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = {
     @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
         key = SWAGGER_AUTHORIZATION_KEY,
         description = "The format of the key is  ```\"Basic <token>\" or 
\"Bearer <token>\"```"),
     @ApiKeyAuthDefinition(name = DATABASE, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE,
         description = "Database context passed through http header. If no 
context is provided 'default' database "
-            + "context will be considered.")}))
+            + "context will be considered.")
+}))
 @Path("/")
 public class PinotSegmentUploadDownloadRestletResource {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotSegmentUploadDownloadRestletResource.class);
@@ -580,7 +583,7 @@ public class PinotSegmentUploadDownloadRestletResource {
 
     try {
       int entryCount = 0;
-      for (Map.Entry<String, SegmentMetadataInfo> entry: 
segmentsMetadataInfoMap.entrySet()) {
+      for (Map.Entry<String, SegmentMetadataInfo> entry : 
segmentsMetadataInfoMap.entrySet()) {
         String segmentName = entry.getKey();
         SegmentMetadataInfo segmentMetadataInfo = entry.getValue();
         segmentNames.add(segmentName);
@@ -808,7 +811,7 @@ public class PinotSegmentUploadDownloadRestletResource {
       boolean enableParallelPushProtection,
       @ApiParam(value = "Whether to refresh if the segment already exists") 
@DefaultValue("true")
       @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) 
boolean allowRefresh,
-      @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
+      @Context HttpHeaders headers, @Context Request request, @Suspended 
AsyncResponse asyncResponse) {
     try {
       asyncResponse.resume(uploadSegment(tableName, 
TableType.valueOf(tableType.toUpperCase()), null, false,
           enableParallelPushProtection, allowRefresh, headers, request));
@@ -847,7 +850,7 @@ public class PinotSegmentUploadDownloadRestletResource {
       boolean enableParallelPushProtection,
       @ApiParam(value = "Whether to refresh if the segment already exists") 
@DefaultValue("true")
       @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) 
boolean allowRefresh,
-      @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
+      @Context HttpHeaders headers, @Context Request request, @Suspended 
AsyncResponse asyncResponse) {
     try {
       asyncResponse.resume(uploadSegment(tableName, 
TableType.valueOf(tableType.toUpperCase()), multiPart, true,
           enableParallelPushProtection, allowRefresh, headers, request));
@@ -895,7 +898,7 @@ public class PinotSegmentUploadDownloadRestletResource {
       boolean allowRefresh,
       @Context HttpHeaders headers,
       @Context Request request,
-      @Suspended final AsyncResponse asyncResponse) {
+      @Suspended AsyncResponse asyncResponse) {
     if (StringUtils.isEmpty(tableName)) {
       throw new ControllerApplicationException(LOGGER,
           "tableName is a required field while uploading segments in batch 
mode.", Response.Status.BAD_REQUEST);
@@ -949,7 +952,7 @@ public class PinotSegmentUploadDownloadRestletResource {
       boolean enableParallelPushProtection,
       @ApiParam(value = "Whether to refresh if the segment already exists") 
@DefaultValue("true")
       @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) 
boolean allowRefresh,
-      @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
+      @Context HttpHeaders headers, @Context Request request, @Suspended 
AsyncResponse asyncResponse) {
     try {
       asyncResponse.resume(
           uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), 
null, true, enableParallelPushProtection,
@@ -989,7 +992,7 @@ public class PinotSegmentUploadDownloadRestletResource {
       boolean enableParallelPushProtection,
       @ApiParam(value = "Whether to refresh if the segment already exists") 
@DefaultValue("true")
       @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) 
boolean allowRefresh,
-      @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
+      @Context HttpHeaders headers, @Context Request request, @Suspended 
AsyncResponse asyncResponse) {
     try {
       asyncResponse.resume(uploadSegment(tableName, 
TableType.valueOf(tableType.toUpperCase()), multiPart, true,
           enableParallelPushProtection, allowRefresh, headers, request));
@@ -1004,12 +1007,14 @@ public class PinotSegmentUploadDownloadRestletResource {
   @Authenticate(AccessType.UPDATE)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation(value = "Start to replace segments", notes = "Start to replace 
segments")
-  public Response startReplaceSegments(
+  @ManagedAsync
+  public void startReplaceSegments(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME", required = true) 
@QueryParam("type") String tableTypeStr,
       @ApiParam(value = "Force cleanup") @QueryParam("forceCleanup") 
@DefaultValue("false") boolean forceCleanup,
       @ApiParam(value = "Fields belonging to start replace segment request", 
required = true)
-      StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Context 
HttpHeaders headers) {
+      StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Context 
HttpHeaders headers,
+      @Suspended AsyncResponse asyncResponse) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     TableType tableType = Constants.validateTableType(tableTypeStr);
     if (tableType == null) {
@@ -1022,10 +1027,12 @@ public class PinotSegmentUploadDownloadRestletResource {
       String segmentLineageEntryId = 
_pinotHelixResourceManager.startReplaceSegments(tableNameWithType,
           startReplaceSegmentsRequest.getSegmentsFrom(), 
startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup,
           startReplaceSegmentsRequest.getCustomMap());
-      return 
Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId", 
segmentLineageEntryId)).build();
+      asyncResponse.resume(
+          Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId", 
segmentLineageEntryId)).build());
     } catch (Exception e) {
       _controllerMetrics.addMeteredTableValue(tableNameWithType, 
ControllerMeter.NUMBER_START_REPLACE_FAILURE, 1);
-      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+      asyncResponse.resume(
+          new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e));
     }
   }
 
@@ -1035,7 +1042,8 @@ public class PinotSegmentUploadDownloadRestletResource {
   @Authenticate(AccessType.UPDATE)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation(value = "End to replace segments", notes = "End to replace 
segments")
-  public Response endReplaceSegments(
+  @ManagedAsync
+  public void endReplaceSegments(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME", required = true) 
@QueryParam("type") String tableTypeStr,
       @ApiParam(value = "Segment lineage entry id returned by 
startReplaceSegments API", required = true)
@@ -1043,7 +1051,8 @@ public class PinotSegmentUploadDownloadRestletResource {
       @ApiParam(value = "Trigger an immediate segment cleanup") 
@QueryParam("cleanup") @DefaultValue("false")
       boolean cleanupSegments,
       @ApiParam(value = "Fields belonging to end replace segment request")
-      EndReplaceSegmentsRequest endReplaceSegmentsRequest, @Context 
HttpHeaders headers) {
+      EndReplaceSegmentsRequest endReplaceSegmentsRequest, @Context 
HttpHeaders headers,
+      @Suspended AsyncResponse asyncResponse) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     TableType tableType = Constants.validateTableType(tableTypeStr);
     if (tableType == null) {
@@ -1060,10 +1069,11 @@ public class PinotSegmentUploadDownloadRestletResource {
       if (cleanupSegments) {
         
_pinotHelixResourceManager.invokeControllerPeriodicTask(tableNameWithType, 
RetentionManager.TASK_NAME, null);
       }
-      return Response.ok().build();
+      asyncResponse.resume(Response.ok().build());
     } catch (Exception e) {
       _controllerMetrics.addMeteredTableValue(tableNameWithType, 
ControllerMeter.NUMBER_END_REPLACE_FAILURE, 1);
-      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+      asyncResponse.resume(
+          new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e));
     }
   }
 
@@ -1073,7 +1083,8 @@ public class PinotSegmentUploadDownloadRestletResource {
   @Authenticate(AccessType.UPDATE)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation(value = "Revert segments replacement", notes = "Revert 
segments replacement")
-  public Response revertReplaceSegments(
+  @ManagedAsync
+  public void revertReplaceSegments(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME", required = true) 
@QueryParam("type") String tableTypeStr,
       @ApiParam(value = "Segment lineage entry id to revert", required = true) 
@QueryParam("segmentLineageEntryId")
@@ -1081,7 +1092,8 @@ public class PinotSegmentUploadDownloadRestletResource {
       @ApiParam(value = "Force revert in case the user knows that the lineage 
entry is interrupted")
       @QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert,
       @ApiParam(value = "Fields belonging to revert replace segment request")
-      RevertReplaceSegmentsRequest revertReplaceSegmentsRequest, @Context 
HttpHeaders headers) {
+      RevertReplaceSegmentsRequest revertReplaceSegmentsRequest, @Context 
HttpHeaders headers,
+      @Suspended AsyncResponse asyncResponse) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     TableType tableType = Constants.validateTableType(tableTypeStr);
     if (tableType == null) {
@@ -1095,10 +1107,11 @@ public class PinotSegmentUploadDownloadRestletResource {
       Preconditions.checkNotNull(segmentLineageEntryId, 
"'segmentLineageEntryId' should not be null");
       _pinotHelixResourceManager.revertReplaceSegments(tableNameWithType, 
segmentLineageEntryId, forceRevert,
           revertReplaceSegmentsRequest);
-      return Response.ok().build();
+      asyncResponse.resume(Response.ok().build());
     } catch (Exception e) {
       _controllerMetrics.addMeteredTableValue(tableNameWithType, 
ControllerMeter.NUMBER_REVERT_REPLACE_FAILURE, 1);
-      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+      asyncResponse.resume(
+          new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e));
     }
   }
 
@@ -1184,8 +1197,8 @@ public class PinotSegmentUploadDownloadRestletResource {
   }
 
   // The multipart input would contain a single multipart and this part would 
contain the segment metadata
-  // files (creation.meta, metadata.properties), and an additional mapping 
file names 'all_segments_metadata' which
-  // would contain the mappings from segment names to segment download URI's.
+// files (creation.meta, metadata.properties), and an additional mapping file 
names 'all_segments_metadata' which
+// would contain the mappings from segment names to segment download URI's.
   private static Map<String, SegmentMetadataInfo> 
createSegmentsMetadataInfoMap(FormDataMultiPart multiPart) {
     List<BodyPart> bodyParts = multiPart.getBodyParts();
     validateMultiPartForBatchSegmentUpload(bodyParts);
@@ -1217,7 +1230,7 @@ public class PinotSegmentUploadDownloadRestletResource {
     }
 
     Map<String, SegmentMetadataInfo> segmentsMetadataInfoMap = new HashMap<>();
-    for (File file: segmentsMetadataFiles) {
+    for (File file : segmentsMetadataFiles) {
       String fileName = file.getName();
       if 
(fileName.equalsIgnoreCase(SegmentUploadConstants.ALL_SEGMENTS_METADATA_FILENAME))
 {
         try (InputStream inputStream = FileUtils.openInputStream(file)) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index a1c4e7912da..78d70bb7790 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -180,8 +180,14 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("List all task types")
-  public Set<String> listTaskTypes() {
-    return _pinotHelixTaskResourceManager.getTaskTypes();
+  @ManagedAsync
+  public void listTaskTypes(@Suspended AsyncResponse asyncResponse) {
+    try {
+      Set<String> taskTypes = _pinotHelixTaskResourceManager.getTaskTypes();
+      asyncResponse.resume(taskTypes);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -190,10 +196,18 @@ public class PinotTaskRestletResource {
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Get summary of all tasks across all task types, grouped by 
tenant. "
       + "Optionally filter by server tenant name to get tasks for a specific 
tenant only.")
-  public PinotHelixTaskResourceManager.TaskSummaryResponse getTasksSummary(
+  @ManagedAsync
+  public void getTasksSummary(
       @ApiParam(value = "Server tenant name to filter tasks. If not specified, 
returns all tenants grouped.")
-      @QueryParam("tenant") @Nullable String tenantName) {
-    return _pinotHelixTaskResourceManager.getTasksSummary(tenantName);
+      @QueryParam("tenant") @Nullable String tenantName,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      PinotHelixTaskResourceManager.TaskSummaryResponse response =
+          _pinotHelixTaskResourceManager.getTasksSummary(tenantName);
+      asyncResponse.resume(response);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -201,9 +215,16 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Get the state (task queue state) for the given task type")
-  public TaskState getTaskQueueState(
-      @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType) {
-    return _pinotHelixTaskResourceManager.getTaskQueueState(taskType);
+  @ManagedAsync
+  public void getTaskQueueState(
+      @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      TaskState state = 
_pinotHelixTaskResourceManager.getTaskQueueState(taskType);
+      asyncResponse.resume(state);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -211,13 +232,18 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("List all tasks for the given task type")
-  public Set<String> getTasks(@ApiParam(value = "Task type", required = true) 
@PathParam("taskType") String taskType) {
-    Set<String> tasks = _pinotHelixTaskResourceManager.getTasks(taskType);
-    if (tasks == null) {
-      throw new NotFoundException("No tasks found for task type: " + taskType);
+  @ManagedAsync
+  public void getTasks(@ApiParam(value = "Task type", required = true) 
@PathParam("taskType") String taskType,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      Set<String> tasks = _pinotHelixTaskResourceManager.getTasks(taskType);
+      if (tasks == null) {
+        throw new NotFoundException("No tasks found for task type: " + 
taskType);
+      }
+      asyncResponse.resume(tasks);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
     }
-
-    return tasks;
   }
 
   @GET
@@ -225,8 +251,15 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK_COUNT)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Count of all tasks for the given task type")
-  public int getTasksCount(@ApiParam(value = "Task type", required = true) 
@PathParam("taskType") String taskType) {
-    return _pinotHelixTaskResourceManager.getTasks(taskType).size();
+  @ManagedAsync
+  public void getTasksCount(@ApiParam(value = "Task type", required = true) 
@PathParam("taskType") String taskType,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      int count = _pinotHelixTaskResourceManager.getTasks(taskType).size();
+      asyncResponse.resume(count);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -234,12 +267,19 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("List all tasks for the given task type")
-  public Map<String, TaskState> getTaskStatesByTable(
+  @ManagedAsync
+  public void getTaskStatesByTable(
       @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
       @ApiParam(value = "Table name with type", required = true) 
@PathParam("tableNameWithType")
-      String tableNameWithType, @Context HttpHeaders headers) {
-    tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
-    return _pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, 
tableNameWithType);
+      String tableNameWithType, @Context HttpHeaders headers, @Suspended 
AsyncResponse asyncResponse) {
+    try {
+      String translatedTableName = 
DatabaseUtils.translateTableName(tableNameWithType, headers);
+      Map<String, TaskState> states =
+          _pinotHelixTaskResourceManager.getTaskStatesByTable(taskType, 
translatedTableName);
+      asyncResponse.resume(states);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -274,7 +314,8 @@ public class PinotTaskRestletResource {
     tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
     _pinotHelixTaskResourceManager.deleteTaskMetadataByTable(taskType, 
tableNameWithType);
     return new SuccessResponse(
-        String.format("Successfully deleted metadata for task type: %s from 
table: %s", taskType, tableNameWithType));
+        String.format("Successfully deleted metadata for task type: %s from 
table: %s", taskType,
+            tableNameWithType));
   }
 
   @GET
@@ -282,7 +323,8 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Fetch count of sub-tasks for each of the tasks for the given 
task type")
-  public Map<String, PinotHelixTaskResourceManager.TaskCount> getTaskCounts(
+  @ManagedAsync
+  public void getTaskCounts(
       @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
       @ApiParam(value = "Task state(s) to filter by. Can be single state or 
comma-separated multiple states "
           + "(NOT_STARTED, IN_PROGRESS, STOPPED, STOPPING, FAILED, COMPLETED, 
ABORTED, TIMED_OUT, TIMING_OUT, "
@@ -290,13 +332,19 @@ public class PinotTaskRestletResource {
       @QueryParam("state") @Nullable String state,
       @ApiParam(value = "Table name with type (e.g., 'myTable_OFFLINE') to 
filter tasks by table. "
           + "Only tasks that have subtasks for this table will be returned.")
-      @QueryParam("table") @Nullable String table, @Context HttpHeaders 
headers) {
-    String tableNameWithType = table != null ? 
DatabaseUtils.translateTableName(table, headers) : null;
-
-    if (StringUtils.isNotEmpty(state) || 
StringUtils.isNotEmpty(tableNameWithType)) {
-      return _pinotHelixTaskResourceManager.getTaskCounts(taskType, state, 
tableNameWithType);
-    } else {
-      return _pinotHelixTaskResourceManager.getTaskCounts(taskType);
+      @QueryParam("table") @Nullable String table, @Context HttpHeaders 
headers,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      String tableNameWithType = table != null ? 
DatabaseUtils.translateTableName(table, headers) : null;
+      Map<String, PinotHelixTaskResourceManager.TaskCount> counts;
+      if (StringUtils.isNotEmpty(state) || 
StringUtils.isNotEmpty(tableNameWithType)) {
+        counts = _pinotHelixTaskResourceManager.getTaskCounts(taskType, state, 
tableNameWithType);
+      } else {
+        counts = _pinotHelixTaskResourceManager.getTaskCounts(taskType);
+      }
+      asyncResponse.resume(counts);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
     }
   }
 
@@ -305,13 +353,21 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.DEBUG_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Fetch information for all the tasks for the given task type")
-  public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> 
getTasksDebugInfo(
+  @ManagedAsync
+  public void getTasksDebugInfo(
       @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
       @ApiParam(value = "verbosity (Prints information for all the tasks for 
the given task type."
           + "By default, only prints subtask details for running and error 
tasks. "
           + "Value of > 0 prints subtask details for all tasks)")
-      @DefaultValue("0") @QueryParam("verbosity") int verbosity) {
-    return _pinotHelixTaskResourceManager.getTasksDebugInfo(taskType, 
verbosity);
+      @DefaultValue("0") @QueryParam("verbosity") int verbosity,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> debugInfo =
+          _pinotHelixTaskResourceManager.getTasksDebugInfo(taskType, 
verbosity);
+      asyncResponse.resume(debugInfo);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -319,16 +375,24 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.DEBUG_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Fetch information for all the tasks for the given task type 
and table")
-  public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> 
getTasksDebugInfo(
+  @ManagedAsync
+  public void getTasksDebugInfo(
       @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
       @ApiParam(value = "Table name with type", required = true) 
@PathParam("tableNameWithType")
       String tableNameWithType,
       @ApiParam(value = "verbosity (Prints information for all the tasks for 
the given task type and table."
           + "By default, only prints subtask details for running and error 
tasks. "
           + "Value of > 0 prints subtask details for all tasks)")
-      @DefaultValue("0") @QueryParam("verbosity") int verbosity, @Context 
HttpHeaders headers) {
-    tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
-    return _pinotHelixTaskResourceManager.getTasksDebugInfoByTable(taskType, 
tableNameWithType, verbosity);
+      @DefaultValue("0") @QueryParam("verbosity") int verbosity, @Context 
HttpHeaders headers,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      String translatedTableName = 
DatabaseUtils.translateTableName(tableNameWithType, headers);
+      Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> debugInfo =
+          _pinotHelixTaskResourceManager.getTasksDebugInfoByTable(taskType, 
translatedTableName, verbosity);
+      asyncResponse.resume(debugInfo);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -336,22 +400,32 @@ public class PinotTaskRestletResource {
   @Path("/tasks/generator/{tableNameWithType}/{taskType}/debug")
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @ApiOperation("Fetch task generation information for the recent runs of the 
given task for the given table")
-  public String getTaskGenerationDebugInto(
+  @ManagedAsync
+  public void getTaskGenerationDebugInfo(
       @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
       @ApiParam(value = "Table name with type", required = true) 
@PathParam("tableNameWithType")
       String tableNameWithType,
       @ApiParam(value = "Whether to only lookup local cache for logs", 
defaultValue = "false") @QueryParam("localOnly")
-      boolean localOnly, @Context HttpHeaders httpHeaders)
+      boolean localOnly, @Context HttpHeaders httpHeaders, @Suspended 
AsyncResponse asyncResponse) {
+    try {
+      String result = getTaskGenerationDebugInfoSync(taskType, 
tableNameWithType, localOnly, httpHeaders);
+      asyncResponse.resume(result);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
+  }
+
+  private String getTaskGenerationDebugInfoSync(String taskType, String 
tableNameWithType, boolean localOnly,
+      HttpHeaders httpHeaders)
       throws JsonProcessingException {
-    tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
httpHeaders);
+    String translatedTableName = 
DatabaseUtils.translateTableName(tableNameWithType, httpHeaders);
     if (localOnly) {
       BaseTaskGeneratorInfo taskGeneratorMostRecentRunInfo =
-          _taskManagerStatusCache.fetchTaskGeneratorInfo(tableNameWithType, 
taskType);
+          _taskManagerStatusCache.fetchTaskGeneratorInfo(translatedTableName, 
taskType);
       if (taskGeneratorMostRecentRunInfo == null) {
         throw new ControllerApplicationException(LOGGER, "Task generation 
information not found",
             Response.Status.NOT_FOUND);
       }
-
       return JsonUtils.objectToString(taskGeneratorMostRecentRunInfo);
     }
 
@@ -360,7 +434,7 @@ public class PinotTaskRestletResource {
     // Relying on original schema that was used to query the controller
     URI uri = _uriInfo.getRequestUri();
     String scheme = uri.getScheme();
-    String finalTableNameWithType = tableNameWithType;
+    String finalTableNameWithType = translatedTableName;
     List<String> controllerUrls = controllers.stream().map(controller -> String
         .format("%s://%s:%d/tasks/generator/%s/%s/debug?localOnly=true", 
scheme, controller.getHostName(),
             Integer.parseInt(controller.getPort()), finalTableNameWithType, 
taskType)).collect(Collectors.toList());
@@ -392,7 +466,8 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.DEBUG_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Fetch information for the given task name")
-  public PinotHelixTaskResourceManager.TaskDebugInfo getTaskDebugInfo(
+  @ManagedAsync
+  public void getTaskDebugInfo(
       @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName,
       @ApiParam(value = "verbosity (Prints information for the given task 
name."
           + "By default, only prints subtask details for running and error 
tasks. "
@@ -400,11 +475,18 @@ public class PinotTaskRestletResource {
       @DefaultValue("0") @QueryParam("verbosity") int verbosity,
       @ApiParam(value = "Table name with type (e.g., 'myTable_OFFLINE') to 
filter subtasks by table. "
           + "Only subtasks for this table will be returned.")
-      @QueryParam("tableName") @Nullable String tableNameWithType, @Context 
HttpHeaders httpHeaders) {
-    if (tableNameWithType != null) {
-      tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
httpHeaders);
+      @QueryParam("tableName") @Nullable String tableNameWithType, @Context 
HttpHeaders httpHeaders,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      String translatedTableName = tableNameWithType != null
+          ? DatabaseUtils.translateTableName(tableNameWithType, httpHeaders)
+          : null;
+      PinotHelixTaskResourceManager.TaskDebugInfo debugInfo =
+          _pinotHelixTaskResourceManager.getTaskDebugInfo(taskName, 
translatedTableName, verbosity);
+      asyncResponse.resume(debugInfo);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
     }
-    return _pinotHelixTaskResourceManager.getTaskDebugInfo(taskName, 
tableNameWithType, verbosity);
   }
 
   @GET
@@ -412,9 +494,16 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Get a map from task to task state for the given task type")
-  public Map<String, TaskState> getTaskStates(
-      @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType) {
-    return _pinotHelixTaskResourceManager.getTaskStates(taskType);
+  @ManagedAsync
+  public void getTaskStates(
+      @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      Map<String, TaskState> states = 
_pinotHelixTaskResourceManager.getTaskStates(taskType);
+      asyncResponse.resume(states);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -422,9 +511,16 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Get the task state for the given task")
-  public TaskState getTaskState(
-      @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName) {
-    return _pinotHelixTaskResourceManager.getTaskState(taskName);
+  @ManagedAsync
+  public void getTaskState(
+      @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      TaskState state = _pinotHelixTaskResourceManager.getTaskState(taskName);
+      asyncResponse.resume(state);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -432,9 +528,16 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Get the states of all the sub tasks for the given task")
-  public Map<String, TaskPartitionState> getSubtaskStates(
-      @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName) {
-    return _pinotHelixTaskResourceManager.getSubtaskStates(taskName);
+  @ManagedAsync
+  public void getSubtaskStates(
+      @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      Map<String, TaskPartitionState> states = 
_pinotHelixTaskResourceManager.getSubtaskStates(taskName);
+      asyncResponse.resume(states);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -442,9 +545,16 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Get the task config (a list of child task configs) for the 
given task")
-  public List<PinotTaskConfig> getTaskConfigs(
-      @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName) {
-    return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
+  @ManagedAsync
+  public void getTaskConfigs(
+      @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      List<PinotTaskConfig> configs = 
_pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
+      asyncResponse.resume(configs);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -452,9 +562,16 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Get the task runtime config for the given task")
-  public Map<String, String> getTaskConfig(
-      @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName) {
-    return _pinotHelixTaskResourceManager.getTaskRuntimeConfig(taskName);
+  @ManagedAsync
+  public void getTaskConfig(
+      @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      Map<String, String> config = 
_pinotHelixTaskResourceManager.getTaskRuntimeConfig(taskName);
+      asyncResponse.resume(config);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -462,11 +579,19 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Get the configs of specified sub tasks for the given task")
-  public Map<String, PinotTaskConfig> getSubtaskConfigs(
+  @ManagedAsync
+  public void getSubtaskConfigs(
       @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName,
       @ApiParam(value = "Sub task names separated by comma") 
@QueryParam("subtaskNames") @Nullable
-      String subtaskNames) {
-    return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName, 
subtaskNames);
+      String subtaskNames,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      Map<String, PinotTaskConfig> configs =
+          _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName, 
subtaskNames);
+      asyncResponse.resume(configs);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @GET
@@ -474,10 +599,23 @@ public class PinotTaskRestletResource {
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_TASK)
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Get progress of specified sub tasks for the given task 
tracked by minion worker in memory")
-  public String getSubtaskProgress(@Context HttpHeaders httpHeaders,
+  @ManagedAsync
+  public void getSubtaskProgress(@Context HttpHeaders httpHeaders,
       @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName,
       @ApiParam(value = "Sub task names separated by comma") 
@QueryParam("subtaskNames") @Nullable
-      String subtaskNames) {
+      String subtaskNames,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      String progress = getSubtaskProgressSync(httpHeaders, taskName, 
subtaskNames);
+      asyncResponse.resume(progress);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
+  }
+
+  private String getSubtaskProgressSync(HttpHeaders httpHeaders, String 
taskName,
+      @Nullable String subtaskNames)
+      throws Exception {
     // Relying on original schema that was used to query the controller
     String scheme = _uriInfo.getRequestUri().getScheme();
     List<InstanceConfig> workers = 
_pinotHelixResourceManager.getAllMinionInstanceConfigs();
@@ -513,11 +651,24 @@ public class PinotTaskRestletResource {
   @ApiResponses(value = {
       @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error")
   })
-  public String getSubtaskOnWorkerProgress(@Context HttpHeaders httpHeaders,
+  @ManagedAsync
+  public void getSubtaskOnWorkerProgress(@Context HttpHeaders httpHeaders,
       @ApiParam(value = "Subtask state 
(UNKNOWN,IN_PROGRESS,SUCCEEDED,CANCELLED,ERROR)", required = true)
       @QueryParam("subTaskState") String subTaskState,
       @ApiParam(value = "Minion worker IDs separated by comma") 
@QueryParam("minionWorkerIds") @Nullable
-      String minionWorkerIds) {
+      String minionWorkerIds,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      String progress = getSubtaskOnWorkerProgressSync(httpHeaders, 
subTaskState, minionWorkerIds);
+      asyncResponse.resume(progress);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
+  }
+
+  private String getSubtaskOnWorkerProgressSync(HttpHeaders httpHeaders, 
String subTaskState,
+      @Nullable String minionWorkerIds)
+      throws Exception {
     Set<String> selectedMinionWorkers = new HashSet<>();
     if (StringUtils.isNotEmpty(minionWorkerIds)) {
       selectedMinionWorkers.addAll(
@@ -673,71 +824,83 @@ public class PinotTaskRestletResource {
   @ApiOperation("Schedule tasks and return a map from task type to task name 
scheduled. If task type is missing, "
       + "schedules all tasks. If table name is missing, schedules tasks for 
all tables in the database. If database "
       + "is missing in headers, uses default.")
-  @Nullable
-  public Map<String, String> scheduleTasks(
+  @ManagedAsync
+  public void scheduleTasks(
       @ApiParam(value = "Task type. If missing, schedules all tasks.") 
@QueryParam("taskType") @Nullable
       String taskType,
       @ApiParam(value = "Table name (with type suffix). If missing, schedules 
tasks for all tables in the database.")
       @QueryParam("tableName") @Nullable String tableName,
       @ApiParam(value = "Minion Instance tag to schedule the task explicitly 
on") @QueryParam("minionInstanceTag")
-      @Nullable String minionInstanceTag, @Context HttpHeaders headers) {
-    String database = headers != null ? headers.getHeaderString(DATABASE) : 
DEFAULT_DATABASE;
-    Map<String, String> response = new HashMap<>();
-    List<String> generationErrors = new ArrayList<>();
-    List<String> schedulingErrors = new ArrayList<>();
-    TaskSchedulingContext context = new TaskSchedulingContext()
-        .setTriggeredBy(CommonConstants.TaskTriggers.MANUAL_TRIGGER.name())
-        .setMinionInstanceTag(minionInstanceTag)
-        .setLeader(false);
-    if (taskType != null) {
-      context.setTasksToSchedule(Collections.singleton(taskType));
-    }
-    if (tableName != null) {
-      
context.setTablesToSchedule(Collections.singleton(DatabaseUtils.translateTableName(tableName,
 headers)));
-    } else {
-      context.setDatabasesToSchedule(Collections.singleton(database));
-    }
-    Map<String, TaskSchedulingInfo> allTaskInfos = 
_pinotTaskManager.scheduleTasks(context);
-    allTaskInfos.forEach((key, value) -> {
-      if (value.getScheduledTaskNames() != null) {
-        response.put(key, String.join(",", value.getScheduledTaskNames()));
+      @Nullable String minionInstanceTag, @Context HttpHeaders headers, 
@Suspended AsyncResponse asyncResponse) {
+    try {
+      String database = headers != null ? headers.getHeaderString(DATABASE) : 
DEFAULT_DATABASE;
+      Map<String, String> response = new HashMap<>();
+      List<String> generationErrors = new ArrayList<>();
+      List<String> schedulingErrors = new ArrayList<>();
+      TaskSchedulingContext context = new TaskSchedulingContext()
+          .setTriggeredBy(CommonConstants.TaskTriggers.MANUAL_TRIGGER.name())
+          .setMinionInstanceTag(minionInstanceTag)
+          .setLeader(false);
+      if (taskType != null) {
+        context.setTasksToSchedule(Collections.singleton(taskType));
       }
-      generationErrors.addAll(value.getGenerationErrors());
-      schedulingErrors.addAll(value.getSchedulingErrors());
-    });
-    response.put(GENERATION_ERRORS_KEY, String.join(",", generationErrors));
-    response.put(SCHEDULING_ERRORS_KEY, String.join(",", schedulingErrors));
-    return response;
+      if (tableName != null) {
+        
context.setTablesToSchedule(Collections.singleton(DatabaseUtils.translateTableName(tableName,
 headers)));
+      } else {
+        context.setDatabasesToSchedule(Collections.singleton(database));
+      }
+      Map<String, TaskSchedulingInfo> allTaskInfos = 
_pinotTaskManager.scheduleTasks(context);
+      allTaskInfos.forEach((key, value) -> {
+        if (value.getScheduledTaskNames() != null) {
+          response.put(key, String.join(",", value.getScheduledTaskNames()));
+        }
+        generationErrors.addAll(value.getGenerationErrors());
+        schedulingErrors.addAll(value.getSchedulingErrors());
+      });
+      response.put(GENERATION_ERRORS_KEY, String.join(",", generationErrors));
+      response.put(SCHEDULING_ERRORS_KEY, String.join(",", schedulingErrors));
+      asyncResponse.resume(response);
+    } catch (Exception e) {
+      asyncResponse.resume(new ControllerApplicationException(LOGGER,
+          String.format("Failed to schedule tasks due to error: %s", 
ExceptionUtils.getStackTrace(e)),
+          Response.Status.INTERNAL_SERVER_ERROR, e));
+    }
   }
 
   @POST
-  @ManagedAsync
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/tasks/execute")
   @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.EXECUTE_TASK)
   @Authenticate(AccessType.CREATE)
   @ApiOperation("Execute a task on minion")
+  @ManagedAsync
   public void executeAdhocTask(AdhocTaskConfig adhocTaskConfig, @Suspended 
AsyncResponse asyncResponse,
       @Context Request requestContext) {
     try {
-      
asyncResponse.resume(_pinotTaskManager.createTask(adhocTaskConfig.getTaskType(),
 adhocTaskConfig.getTableName(),
-          adhocTaskConfig.getTaskName(), adhocTaskConfig.getTaskConfigs()));
-    } catch (TableNotFoundException e) {
-      throw new ControllerApplicationException(LOGGER, "Failed to find table: 
" + adhocTaskConfig.getTableName(),
-          Response.Status.NOT_FOUND, e);
-    } catch (TaskAlreadyExistsException e) {
-      throw new ControllerApplicationException(LOGGER, "Task already exists: " 
+ adhocTaskConfig.getTaskName(),
-          Response.Status.CONFLICT, e);
-    } catch (UnknownTaskTypeException e) {
-      throw new ControllerApplicationException(LOGGER, "Unknown task type: " + 
adhocTaskConfig.getTaskType(),
-          Response.Status.NOT_FOUND, e);
-    } catch (NoTaskScheduledException e) {
-      throw new ControllerApplicationException(LOGGER,
-          "No task is generated for table: " + adhocTaskConfig.getTableName() 
+ ", with task type: "
-              + adhocTaskConfig.getTaskType(), Response.Status.BAD_REQUEST, e);
+      try {
+        asyncResponse.resume(
+            _pinotTaskManager.createTask(adhocTaskConfig.getTaskType(), 
adhocTaskConfig.getTableName(),
+                adhocTaskConfig.getTaskName(), 
adhocTaskConfig.getTaskConfigs()));
+      } catch (TableNotFoundException e) {
+        throw new ControllerApplicationException(LOGGER, "Failed to find 
table: " + adhocTaskConfig.getTableName(),
+            Response.Status.NOT_FOUND, e);
+      } catch (TaskAlreadyExistsException e) {
+        throw new ControllerApplicationException(LOGGER, "Task already exists: 
" + adhocTaskConfig.getTaskName(),
+            Response.Status.CONFLICT, e);
+      } catch (UnknownTaskTypeException e) {
+        throw new ControllerApplicationException(LOGGER, "Unknown task type: " 
+ adhocTaskConfig.getTaskType(),
+            Response.Status.NOT_FOUND, e);
+      } catch (NoTaskScheduledException e) {
+        throw new ControllerApplicationException(LOGGER,
+            "No task is generated for table: " + 
adhocTaskConfig.getTableName() + ", with task type: "
+                + adhocTaskConfig.getTaskType(), Response.Status.BAD_REQUEST, 
e);
+      } catch (Exception e) {
+        throw new ControllerApplicationException(LOGGER,
+            "Failed to create adhoc task: " + ExceptionUtils.getStackTrace(e), 
Response.Status.INTERNAL_SERVER_ERROR,
+            e);
+      }
     } catch (Exception e) {
-      throw new ControllerApplicationException(LOGGER,
-          "Failed to create adhoc task: " + ExceptionUtils.getStackTrace(e), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+      asyncResponse.resume(e);
     }
   }
 
@@ -747,10 +910,17 @@ public class PinotTaskRestletResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Authenticate(AccessType.UPDATE)
   @ApiOperation("Clean up finished tasks (COMPLETED, FAILED) for the given 
task type")
-  public SuccessResponse cleanUpTasks(
-      @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType) {
-    _pinotHelixTaskResourceManager.cleanUpTaskQueue(taskType);
-    return new SuccessResponse("Successfully cleaned up tasks for task type: " 
+ taskType);
+  @ManagedAsync
+  public void cleanUpTasks(
+      @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      _pinotHelixTaskResourceManager.cleanUpTaskQueue(taskType);
+      SuccessResponse response = new SuccessResponse("Successfully cleaned up 
tasks for task type: " + taskType);
+      asyncResponse.resume(response);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @PUT
@@ -759,10 +929,17 @@ public class PinotTaskRestletResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Authenticate(AccessType.UPDATE)
   @ApiOperation("Stop all running/pending tasks (as well as the task queue) 
for the given task type")
-  public SuccessResponse stopTasks(
-      @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType) {
-    _pinotHelixTaskResourceManager.stopTaskQueue(taskType);
-    return new SuccessResponse("Successfully stopped tasks for task type: " + 
taskType);
+  @ManagedAsync
+  public void stopTasks(
+      @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      _pinotHelixTaskResourceManager.stopTaskQueue(taskType);
+      SuccessResponse response = new SuccessResponse("Successfully stopped 
tasks for task type: " + taskType);
+      asyncResponse.resume(response);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @PUT
@@ -771,10 +948,17 @@ public class PinotTaskRestletResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Authenticate(AccessType.UPDATE)
   @ApiOperation("Resume all stopped tasks (as well as the task queue) for the 
given task type")
-  public SuccessResponse resumeTasks(
-      @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType) {
-    _pinotHelixTaskResourceManager.resumeTaskQueue(taskType);
-    return new SuccessResponse("Successfully resumed tasks for task type: " + 
taskType);
+  @ManagedAsync
+  public void resumeTasks(
+      @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      _pinotHelixTaskResourceManager.resumeTaskQueue(taskType);
+      SuccessResponse response = new SuccessResponse("Successfully resumed 
tasks for task type: " + taskType);
+      asyncResponse.resume(response);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @DELETE
@@ -783,12 +967,19 @@ public class PinotTaskRestletResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Authenticate(AccessType.DELETE)
   @ApiOperation("Delete all tasks (as well as the task queue) for the given 
task type")
-  public SuccessResponse deleteTasks(
+  @ManagedAsync
+  public void deleteTasks(
       @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
       @ApiParam(value = "Whether to force deleting the tasks (expert only 
option, enable with cautious")
-      @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete) {
-    _pinotHelixTaskResourceManager.deleteTaskQueue(taskType, forceDelete);
-    return new SuccessResponse("Successfully deleted tasks for task type: " + 
taskType);
+      @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      _pinotHelixTaskResourceManager.deleteTaskQueue(taskType, forceDelete);
+      SuccessResponse response = new SuccessResponse("Successfully deleted 
tasks for task type: " + taskType);
+      asyncResponse.resume(response);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @DELETE
@@ -797,12 +988,19 @@ public class PinotTaskRestletResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Authenticate(AccessType.DELETE)
   @ApiOperation("Delete a single task given its task name")
-  public SuccessResponse deleteTask(
+  @ManagedAsync
+  public void deleteTask(
       @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName,
       @ApiParam(value = "Whether to force deleting the task (expert only 
option, enable with cautious")
-      @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete) {
-    _pinotHelixTaskResourceManager.deleteTask(taskName, forceDelete);
-    return new SuccessResponse("Successfully deleted task: " + taskName);
+      @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      _pinotHelixTaskResourceManager.deleteTask(taskName, forceDelete);
+      SuccessResponse response = new SuccessResponse("Successfully deleted 
task: " + taskName);
+      asyncResponse.resume(response);
+    } catch (Exception e) {
+      asyncResponse.resume(e);
+    }
   }
 
   @DELETE
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
index 673554c4caf..8fa53df4dc2 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
@@ -25,9 +25,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MultivaluedHashMap;
 import javax.ws.rs.core.UriInfo;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
@@ -45,10 +48,10 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 
 
@@ -61,6 +64,10 @@ public class PinotTaskRestletResourceTest {
   ControllerConf _controllerConf;
   @Mock
   UriInfo _uriInfo;
+  @Mock
+  Executor _executor;
+  @Mock
+  HttpClientConnectionManager _connectionManager;
 
   @InjectMocks
   PinotTaskRestletResource _pinotTaskRestletResource;
@@ -108,13 +115,18 @@ public class PinotTaskRestletResourceTest {
     
when(_pinotHelixResourceManager.getAllMinionInstanceConfigs()).thenReturn(List.of(minion1,
 minion2));
     HttpHeaders httpHeaders = Mockito.mock(HttpHeaders.class);
     when(httpHeaders.getRequestHeaders()).thenReturn(new 
MultivaluedHashMap<>());
+    when(_controllerConf.getMinionAdminRequestTimeoutSeconds()).thenReturn(10);
+    @SuppressWarnings("unchecked")
     ArgumentCaptor<Map<String, String>> minionWorkerEndpointsCaptor = 
ArgumentCaptor.forClass(Map.class);
     
when(_pinotHelixTaskResourceManager.getSubtaskOnWorkerProgress(anyString(), 
any(), any(),
         minionWorkerEndpointsCaptor.capture(), anyMap(), anyInt()))
         .thenReturn(Collections.emptyMap());
-    String progress =
-        _pinotTaskRestletResource.getSubtaskOnWorkerProgress(httpHeaders, 
"IN_PROGRESS", minionWorkerIds);
-    assertEquals(progress, "{}");
+    AsyncResponse asyncResponse = Mockito.mock(AsyncResponse.class);
+    ArgumentCaptor<Object> responseCaptor = 
ArgumentCaptor.forClass(Object.class);
+    _pinotTaskRestletResource.getSubtaskOnWorkerProgress(httpHeaders, 
"IN_PROGRESS", minionWorkerIds, asyncResponse);
+
+    verify(asyncResponse).resume(responseCaptor.capture());
+    assertEquals(responseCaptor.getValue(), "{}");
     return minionWorkerEndpointsCaptor.getValue();
   }
 
@@ -131,11 +143,16 @@ public class PinotTaskRestletResourceTest {
     
when(_pinotHelixResourceManager.getAllMinionInstanceConfigs()).thenReturn(Collections.emptyList());
     HttpHeaders httpHeaders = Mockito.mock(HttpHeaders.class);
     when(httpHeaders.getRequestHeaders()).thenReturn(new 
MultivaluedHashMap<>());
+    when(_controllerConf.getMinionAdminRequestTimeoutSeconds()).thenReturn(10);
     when(_pinotHelixTaskResourceManager
         .getSubtaskOnWorkerProgress(anyString(), any(), any(), anyMap(), 
anyMap(), anyInt()))
         .thenThrow(new RuntimeException());
-    assertThrows(ControllerApplicationException.class,
-        () -> 
_pinotTaskRestletResource.getSubtaskOnWorkerProgress(httpHeaders, 
"IN_PROGRESS", null));
+    AsyncResponse asyncResponse = Mockito.mock(AsyncResponse.class);
+    ArgumentCaptor<Throwable> responseCaptor = 
ArgumentCaptor.forClass(Throwable.class);
+    _pinotTaskRestletResource.getSubtaskOnWorkerProgress(httpHeaders, 
"IN_PROGRESS", null, asyncResponse);
+
+    verify(asyncResponse).resume(responseCaptor.capture());
+    assertTrue(responseCaptor.getValue() instanceof 
ControllerApplicationException);
   }
 
   @Test
@@ -146,8 +163,14 @@ public class PinotTaskRestletResourceTest {
 
     
when(_pinotHelixTaskResourceManager.getTasksSummary(null)).thenReturn(emptyResponse);
 
-    PinotHelixTaskResourceManager.TaskSummaryResponse response = 
_pinotTaskRestletResource.getTasksSummary(null);
+    AsyncResponse asyncResponse = Mockito.mock(AsyncResponse.class);
+    ArgumentCaptor<Object> responseCaptor = 
ArgumentCaptor.forClass(Object.class);
+    _pinotTaskRestletResource.getTasksSummary(null, asyncResponse);
+
+    verify(asyncResponse).resume(responseCaptor.capture());
 
+    PinotHelixTaskResourceManager.TaskSummaryResponse response =
+        (PinotHelixTaskResourceManager.TaskSummaryResponse) 
responseCaptor.getValue();
     assertNotNull(response);
     assertEquals(response.getTotalRunningTasks(), 0);
     assertEquals(response.getTotalWaitingTasks(), 0);
@@ -183,8 +206,14 @@ public class PinotTaskRestletResourceTest {
 
     
when(_pinotHelixTaskResourceManager.getTasksSummary(null)).thenReturn(response);
 
-    PinotHelixTaskResourceManager.TaskSummaryResponse actualResponse = 
_pinotTaskRestletResource.getTasksSummary(null);
+    AsyncResponse asyncResponse = Mockito.mock(AsyncResponse.class);
+    ArgumentCaptor<Object> responseCaptor = 
ArgumentCaptor.forClass(Object.class);
+    _pinotTaskRestletResource.getTasksSummary(null, asyncResponse);
 
+    verify(asyncResponse).resume(responseCaptor.capture());
+
+    PinotHelixTaskResourceManager.TaskSummaryResponse actualResponse =
+        (PinotHelixTaskResourceManager.TaskSummaryResponse) 
responseCaptor.getValue();
     assertNotNull(actualResponse);
     assertEquals(actualResponse.getTotalRunningTasks(), 150);
     assertEquals(actualResponse.getTotalWaitingTasks(), 50);
@@ -228,9 +257,14 @@ public class PinotTaskRestletResourceTest {
 
     
when(_pinotHelixTaskResourceManager.getTasksSummary("defaultTenant")).thenReturn(response);
 
-    PinotHelixTaskResourceManager.TaskSummaryResponse actualResponse =
-        _pinotTaskRestletResource.getTasksSummary("defaultTenant");
+    AsyncResponse asyncResponse = Mockito.mock(AsyncResponse.class);
+    ArgumentCaptor<Object> responseCaptor = 
ArgumentCaptor.forClass(Object.class);
+    _pinotTaskRestletResource.getTasksSummary("defaultTenant", asyncResponse);
 
+    verify(asyncResponse).resume(responseCaptor.capture());
+
+    PinotHelixTaskResourceManager.TaskSummaryResponse actualResponse =
+        (PinotHelixTaskResourceManager.TaskSummaryResponse) 
responseCaptor.getValue();
     assertNotNull(actualResponse);
     assertEquals(actualResponse.getTotalRunningTasks(), 100);
     assertEquals(actualResponse.getTotalWaitingTasks(), 30);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to