This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 87f70ffe1d Adds remove ingestion-metrics API (#16045)
87f70ffe1d is described below
commit 87f70ffe1db7a5ff301fd01ab73c83fe34da433b
Author: NOOB <[email protected]>
AuthorDate: Tue Jun 17 00:26:53 2025 +0530
Adds remove ingestion-metrics API (#16045)
---
.../api/resources/PinotTableInstances.java | 71 ++++++++++++++++++++++
.../java/org/apache/pinot/core/auth/Actions.java | 1 +
.../manager/realtime/IngestionDelayTracker.java | 16 +++++
.../manager/realtime/RealtimeTableDataManager.java | 10 +++
.../pinot/server/api/resources/TablesResource.java | 41 +++++++++++++
5 files changed, 139 insertions(+)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java
index 15d1ca0504..419b5f1b6f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java
@@ -20,6 +20,8 @@ package org.apache.pinot.controller.api.resources;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
@@ -29,9 +31,15 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
+import java.net.URI;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import javax.inject.Inject;
+import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -42,8 +50,13 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.api.access.AccessType;
+import org.apache.pinot.controller.api.access.Authenticate;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.auth.Actions;
@@ -186,4 +199,62 @@ public class PinotTableInstances {
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.NOT_FOUND);
}
}
+
+ @DELETE
+ @Path("/tables/{tableName}/{instanceId}/ingestionMetrics")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.DELETE_INGESTION_METRICS)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation(value = "Remove realtime ingestion metrics emitted per
partitionId from serverInstance", notes =
+ "Removes ingestion-related metrics from serverInstance for partition(s)
under the specified table")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully removed ingestion
metrics."),
+ @ApiResponse(code = 500, message = "Internal Server Error")
+ })
+ public SuccessResponse removeIngestionMetrics(
+ @ApiParam(value = "Table name", required = true) @PathParam("tableName")
String tableName,
+ @ApiParam(value = "Instance id of the server", required = true)
@PathParam("instanceId") String instanceId,
+ @ApiParam(value = "List of Partition Ids (optional)")
@QueryParam("partitionId") @Nullable
+ Set<Integer> partitionIds,
+ @Context HttpHeaders headers) {
+ try {
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.BAD_REQUEST);
+ }
+ String tableNameWithType =
+
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, TableType.REALTIME, LOGGER)
+ .get(0);
+ String serverEndpoint;
+ try {
+ BiMap<String, String> dataInstanceAdminEndpoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(instanceId));
+ serverEndpoint = dataInstanceAdminEndpoints.get(instanceId);
+ Preconditions.checkNotNull(serverEndpoint, "Server endpoint not found
for instance: " + instanceId);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to get server
endpoint for instance: " + instanceId,
+ Response.Status.BAD_REQUEST);
+ }
+ StringBuilder uriBuilder = new StringBuilder(serverEndpoint)
+ .append("/tables/")
+ .append(tableNameWithType)
+ .append("/ingestionMetrics");
+
+ if (CollectionUtils.isNotEmpty(partitionIds)) {
+ String query = partitionIds.stream()
+ .map(id -> "partitionId=" + id)
+ .collect(Collectors.joining("&"));
+ uriBuilder.append("?").append(query);
+ }
+
+ String fullUrl = uriBuilder.toString();
+ SimpleHttpResponse simpleHttpResponse;
+ try {
+ simpleHttpResponse =
+
HttpClient.wrapAndThrowHttpException(HttpClient.getInstance().sendDeleteRequest(URI.create(fullUrl)));
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ return new SuccessResponse(simpleHttpResponse.getResponse());
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 6dec9b7903..000aab90c8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -120,6 +120,7 @@ public class Actions {
public static final String DELETE_SEGMENT = "DeleteSegment";
public static final String DELETE_TABLE = "DeleteTable";
public static final String DELETE_TIME_BOUNDARY = "DeleteTimeBoundary";
+ public static final String DELETE_INGESTION_METRICS =
"DeleteIngestionMetrics";
public static final String DISABLE_TABLE = "DisableTable";
public static final String DOWNLOAD_SEGMENT = "DownloadSegment";
public static final String ENABLE_TABLE = "EnableTable";
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 2b52b29f2d..a4cbef0488 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -23,6 +23,7 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.time.Clock;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -212,6 +213,7 @@ public class IngestionDelayTracker {
_serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
_serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
_serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
+ LOGGER.info("Successfully removed ingestion metrics for partition id:
{}", partitionId);
}
return null;
});
@@ -319,6 +321,20 @@ public class IngestionDelayTracker {
removePartitionId(partitionId);
}
+ /**
+ * Handles all partition removal event. This must be invoked when we stop
serving partitions for this table in the
+ * current server.
+ *
+ * @return Set of partitionIds for which ingestion metrics were removed.
+ */
+ public Set<Integer> stopTrackingIngestionDelayForAllPartitions() {
+ Set<Integer> removedPartitionIds = new
HashSet<>(_ingestionInfoMap.keySet());
+ for (Integer partitionId : _ingestionInfoMap.keySet()) {
+ removePartitionId(partitionId);
+ }
+ return removedPartitionIds;
+ }
+
/**
* Stops tracking the partition ingestion delay, and also ignores the
updates from the given segment. This is useful
* when we want to stop tracking the ingestion delay for a partition when
the segment might still be consuming, e.g.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 277fd5b910..e56e937b99 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -509,6 +509,16 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
}
+ public Set<Integer> stopTrackingPartitionIngestionDelay(@Nullable
Set<Integer> partitionIds) {
+ if (CollectionUtils.isEmpty(partitionIds)) {
+ return
_ingestionDelayTracker.stopTrackingIngestionDelayForAllPartitions();
+ }
+ for (Integer partitionId: partitionIds) {
+ _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionId);
+ }
+ return partitionIds;
+ }
+
private void doAddConsumingSegment(String segmentName)
throws Exception {
SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index da822cef86..d25b4b872c 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -41,8 +41,10 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
+import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
@@ -1192,4 +1194,43 @@ public class TablesResource {
throw new WebApplicationException(e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR);
}
}
+
+ @DELETE
+ @Path("/tables/{tableName}/ingestionMetrics")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Remove ingestion metrics for partition(s)", notes =
"Removes ingestion-related metrics for "
+ + "the given table. If no partitionId is provided, metrics for all
partitions hosted by this server will be "
+ + "removed.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully removed ingestion
metrics"),
+ @ApiResponse(code = 500, message = "Internal Server Error")
+ })
+ public String removeIngestionMetrics(
+ @ApiParam(value = "Table name", required = true) @PathParam("tableName")
String tableName,
+ @Nullable @ApiParam(value = "List of partition Ids (optional)")
@QueryParam("partitionId")
+ Set<Integer> partitionIds,
+ @Context HttpHeaders headers) {
+ try {
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ } catch (Exception e) {
+ throw new WebApplicationException(e.getMessage(),
Response.Status.BAD_REQUEST);
+ }
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ TableDataManager tableDataManager =
+ ServerResourceUtils.checkGetTableDataManager(_serverInstance,
tableNameWithType);
+ try {
+ if (tableDataManager instanceof RealtimeTableDataManager) {
+ RealtimeTableDataManager realtimeTableDataManager =
(RealtimeTableDataManager) tableDataManager;
+ Set<Integer> removedPartitionIds =
realtimeTableDataManager.stopTrackingPartitionIngestionDelay(partitionIds);
+ return "Successfully removed ingestion metrics for partitions: " +
removedPartitionIds + " in table: "
+ + tableNameWithType;
+ } else {
+ throw new WebApplicationException(
+ "TableDataManager is not RealtimeTableDataManager for table: " +
tableNameWithType,
+ Response.Status.BAD_REQUEST);
+ }
+ } catch (Exception e) {
+ throw new WebApplicationException(e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]