Jackie-Jiang commented on code in PR #17494:
URL: https://github.com/apache/pinot/pull/17494#discussion_r2688669348
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -984,29 +994,49 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart
multiPart,
@Authenticate(AccessType.UPDATE)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Start to replace segments", notes = "Start to replace
segments")
- public Response startReplaceSegments(
+ public void startReplaceSegments(
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr,
@ApiParam(value = "Force cleanup") @QueryParam("forceCleanup")
@DefaultValue("false") boolean forceCleanup,
@ApiParam(value = "Fields belonging to start replace segment request",
required = true)
- StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Context
HttpHeaders headers) {
- tableName = DatabaseUtils.translateTableName(tableName, headers);
- TableType tableType = Constants.validateTableType(tableTypeStr);
- if (tableType == null) {
- throw new ControllerApplicationException(LOGGER, "Table type should
either be offline or realtime",
- Response.Status.BAD_REQUEST);
- }
- String tableNameWithType =
-
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableType, LOGGER).get(0);
- try {
- String segmentLineageEntryId =
_pinotHelixResourceManager.startReplaceSegments(tableNameWithType,
- startReplaceSegmentsRequest.getSegmentsFrom(),
startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup,
- startReplaceSegmentsRequest.getCustomMap());
- return
Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId",
segmentLineageEntryId)).build();
- } catch (Exception e) {
- _controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.NUMBER_START_REPLACE_FAILURE, 1);
- throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
- }
+ StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Context
HttpHeaders headers,
+ @Suspended final AsyncResponse asyncResponse) {
+ _resourceExecutorService.execute(() -> {
+ try {
+ String translatedTableName =
DatabaseUtils.translateTableName(tableName, headers);
+ TableType tableType = Constants.validateTableType(tableTypeStr);
+ if (tableType == null) {
+ asyncResponse.resume(
+ new ControllerApplicationException(LOGGER, "Table type should
either be offline or realtime",
+ Response.Status.BAD_REQUEST));
+ return;
+ }
+ String tableNameWithType =
+
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
translatedTableName, tableType,
+ LOGGER).get(0);
+ String segmentLineageEntryId =
_pinotHelixResourceManager.startReplaceSegments(tableNameWithType,
+ startReplaceSegmentsRequest.getSegmentsFrom(),
startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup,
+ startReplaceSegmentsRequest.getCustomMap());
+ asyncResponse.resume(
+ Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId",
segmentLineageEntryId)).build());
+ } catch (Exception e) {
+ String tableNameWithType = null;
Review Comment:
Does it have to be this complicated? Can we translate it in the main thread?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -984,29 +994,49 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart
multiPart,
@Authenticate(AccessType.UPDATE)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Start to replace segments", notes = "Start to replace
segments")
- public Response startReplaceSegments(
+ public void startReplaceSegments(
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr,
@ApiParam(value = "Force cleanup") @QueryParam("forceCleanup")
@DefaultValue("false") boolean forceCleanup,
@ApiParam(value = "Fields belonging to start replace segment request",
required = true)
- StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Context
HttpHeaders headers) {
- tableName = DatabaseUtils.translateTableName(tableName, headers);
- TableType tableType = Constants.validateTableType(tableTypeStr);
- if (tableType == null) {
- throw new ControllerApplicationException(LOGGER, "Table type should
either be offline or realtime",
- Response.Status.BAD_REQUEST);
- }
- String tableNameWithType =
-
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableType, LOGGER).get(0);
- try {
- String segmentLineageEntryId =
_pinotHelixResourceManager.startReplaceSegments(tableNameWithType,
- startReplaceSegmentsRequest.getSegmentsFrom(),
startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup,
- startReplaceSegmentsRequest.getCustomMap());
- return
Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId",
segmentLineageEntryId)).build();
- } catch (Exception e) {
- _controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.NUMBER_START_REPLACE_FAILURE, 1);
- throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
- }
+ StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Context
HttpHeaders headers,
+ @Suspended final AsyncResponse asyncResponse) {
Review Comment:
(nit) We don't usually put `final` in local variable
##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -618,6 +620,11 @@ private void setUpPinotController() {
_tenantRebalancer =
new TenantRebalancer(_tableRebalanceManager, _helixResourceManager,
_rebalancerExecutorService);
+ // Set up executor service for specific resources for isolation
+ _minionTaskResourceExecutorService =
+
createExecutorService(HttpServerThreadPoolConfig.defaultInstance().getCorePoolSize(),
Review Comment:
Do you want to honor the `ListenerConfig` or find a different way to
configure the pool size? E.g. we have a dedicate executor for rebalance
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]