Jackie-Jiang commented on code in PR #16045:
URL: https://github.com/apache/pinot/pull/16045#discussion_r2146237825
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java:
##########
@@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>>
getLiveBrokers(@Context HttpHeaders heade
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.NOT_FOUND);
}
}
+
+ @DELETE
+ @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics")
Review Comment:
We usually use `tables` as prefix. We don't need table type, as it only
applies to `REALTIME` table.
```suggestion
@Path("/tables/{tableName}/{instanceId}/ingestionMetrics")
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java:
##########
@@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>>
getLiveBrokers(@Context HttpHeaders heade
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.NOT_FOUND);
}
}
+
+ @DELETE
+ @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType",
action =
+ Actions.Table.DELETE_INGESTION_METRICS)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation(value = "Remove realtime ingestion metrics emitted per
partitionGroupID from serverInstance",
+ notes = "Removes ingestion-related metrics from serverInstance for
partition(s) under the specified table")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully removed ingestion
metrics."),
+ @ApiResponse(code = 500, message = "Internal Server Error")
+ })
+ public SuccessResponse removeIngestionMetrics(
+ @ApiParam(value = "Table name with type", required = true)
@PathParam("tableNameWithType")
+ String tableNameWithType,
+ @ApiParam(value = "Instance name of the server", required = true)
@PathParam("instanceName")
+ String instanceName,
+ @ApiParam(value = "Comma-separated list of partition group IDs
(optional)") @QueryParam("partitionGroupId")
Review Comment:
Does it take comma separated one, or it has to be specified as multiple
parameters, such as `partitionId=1&partitionId=2`?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -319,6 +321,21 @@ public void stopTrackingPartitionIngestionDelay(int
partitionId) {
removePartitionId(partitionId);
}
+ /**
+ * Handles all partition removal event. This must be invoked when we stop
serving partitions for this table in the
+ * current server.
+ *
+ * @return Set of partitionIds for which ingestion metrics were removed.
+ */
+ public Set<Integer> stopTrackingPartitionIngestionDelay() {
+ Set<Integer> removedPartitionIds = new HashSet<>();
Review Comment:
I feel it is safer to first initialize `removedPartitionIds = new
HashSet<>(_ingestionInfoMap.keySet())` to avoid iterating and modifying the
same map at the same time, even though concurrent map can probably handle it
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java:
##########
@@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>>
getLiveBrokers(@Context HttpHeaders heade
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.NOT_FOUND);
}
}
+
+ @DELETE
+ @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType",
action =
+ Actions.Table.DELETE_INGESTION_METRICS)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation(value = "Remove realtime ingestion metrics emitted per
partitionGroupID from serverInstance",
+ notes = "Removes ingestion-related metrics from serverInstance for
partition(s) under the specified table")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully removed ingestion
metrics."),
+ @ApiResponse(code = 500, message = "Internal Server Error")
+ })
+ public SuccessResponse removeIngestionMetrics(
+ @ApiParam(value = "Table name with type", required = true)
@PathParam("tableNameWithType")
+ String tableNameWithType,
+ @ApiParam(value = "Instance name of the server", required = true)
@PathParam("instanceName")
+ String instanceName,
+ @ApiParam(value = "Comma-separated list of partition group IDs
(optional)") @QueryParam("partitionGroupId")
+ Set<Integer> partitionGroupIds,
+ @Context HttpHeaders headers) {
+ try {
+ tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.BAD_REQUEST);
+ }
+ if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ throw new ControllerApplicationException(LOGGER, "Table " +
tableNameWithType + " should be a realtime table.",
+ Response.Status.BAD_REQUEST);
+ }
+ String serverEndpoint;
+ try {
+ BiMap<String, String> dataInstanceAdminEndpoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(instanceName));
+ serverEndpoint = dataInstanceAdminEndpoints.get(instanceName);
+ Preconditions.checkNotNull(serverEndpoint, "Server endpoint not found
for instance: " + instanceName);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to get server
endpoint for instance: " + instanceName,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ StringBuilder uriBuilder = new StringBuilder(serverEndpoint)
+ .append("/tables/")
+ .append(tableNameWithType)
+ .append("/ingestionMetrics");
+
+ if (partitionGroupIds != null && !partitionGroupIds.isEmpty()) {
Review Comment:
(minor)
```suggestion
if (CollectionUtils.isNotEmpty(partitionIds)) {
```
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -319,6 +321,21 @@ public void stopTrackingPartitionIngestionDelay(int
partitionId) {
removePartitionId(partitionId);
}
+ /**
+ * Handles all partition removal event. This must be invoked when we stop
serving partitions for this table in the
+ * current server.
+ *
+ * @return Set of partitionIds for which ingestion metrics were removed.
+ */
+ public Set<Integer> stopTrackingPartitionIngestionDelay() {
Review Comment:
```suggestion
public Set<Integer> stopTrackingIngestionDelayForAllPartitions() {
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java:
##########
@@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>>
getLiveBrokers(@Context HttpHeaders heade
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.NOT_FOUND);
}
}
+
+ @DELETE
+ @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType",
action =
+ Actions.Table.DELETE_INGESTION_METRICS)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation(value = "Remove realtime ingestion metrics emitted per
partitionGroupID from serverInstance",
+ notes = "Removes ingestion-related metrics from serverInstance for
partition(s) under the specified table")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully removed ingestion
metrics."),
+ @ApiResponse(code = 500, message = "Internal Server Error")
+ })
+ public SuccessResponse removeIngestionMetrics(
+ @ApiParam(value = "Table name with type", required = true)
@PathParam("tableNameWithType")
+ String tableNameWithType,
+ @ApiParam(value = "Instance name of the server", required = true)
@PathParam("instanceName")
+ String instanceName,
+ @ApiParam(value = "Comma-separated list of partition group IDs
(optional)") @QueryParam("partitionGroupId")
+ Set<Integer> partitionGroupIds,
+ @Context HttpHeaders headers) {
+ try {
+ tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.BAD_REQUEST);
+ }
+ if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ throw new ControllerApplicationException(LOGGER, "Table " +
tableNameWithType + " should be a realtime table.",
+ Response.Status.BAD_REQUEST);
+ }
+ String serverEndpoint;
+ try {
+ BiMap<String, String> dataInstanceAdminEndpoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(instanceName));
+ serverEndpoint = dataInstanceAdminEndpoints.get(instanceName);
+ Preconditions.checkNotNull(serverEndpoint, "Server endpoint not found
for instance: " + instanceName);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to get server
endpoint for instance: " + instanceName,
+ Response.Status.INTERNAL_SERVER_ERROR);
Review Comment:
Should we count this as bad request?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java:
##########
@@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>>
getLiveBrokers(@Context HttpHeaders heade
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.NOT_FOUND);
}
}
+
+ @DELETE
+ @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType",
action =
+ Actions.Table.DELETE_INGESTION_METRICS)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation(value = "Remove realtime ingestion metrics emitted per
partitionGroupID from serverInstance",
Review Comment:
Let's call it `partitionId`. I believe `partitionGroupId` was introduced for
some historical reason (probably high level consumer which is already removed),
and let's use `partitionId` going forward for conciseness
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java:
##########
@@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>>
getLiveBrokers(@Context HttpHeaders heade
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.NOT_FOUND);
}
}
+
+ @DELETE
+ @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType",
action =
+ Actions.Table.DELETE_INGESTION_METRICS)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation(value = "Remove realtime ingestion metrics emitted per
partitionGroupID from serverInstance",
+ notes = "Removes ingestion-related metrics from serverInstance for
partition(s) under the specified table")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully removed ingestion
metrics."),
+ @ApiResponse(code = 500, message = "Internal Server Error")
+ })
+ public SuccessResponse removeIngestionMetrics(
+ @ApiParam(value = "Table name with type", required = true)
@PathParam("tableNameWithType")
+ String tableNameWithType,
+ @ApiParam(value = "Instance name of the server", required = true)
@PathParam("instanceName")
+ String instanceName,
+ @ApiParam(value = "Comma-separated list of partition group IDs
(optional)") @QueryParam("partitionGroupId")
+ Set<Integer> partitionGroupIds,
Review Comment:
Mark it `@Nullable`
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -1192,4 +1193,44 @@ public List<StaleSegment> getStaleSegments(
throw new WebApplicationException(e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR);
}
}
+
+ @DELETE
+ @Path("/tables/{tableNameWithType}/ingestionMetrics")
Review Comment:
Same comments as the controller one
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]