9aman commented on code in PR #15299:
URL: https://github.com/apache/pinot/pull/15299#discussion_r2016334411
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1175,6 +1179,136 @@ public SuccessResponse updateTimeIntervalZK(
return updateZKTimeIntervalInternal(tableNameWithType);
}
+ @DELETE
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/deletePauselessSegments/{tableName}")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.DELETE_SEGMENT)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation(value = "Delete segments from a pauseless enabled table",
notes =
+ "Deletes segments from a pauseless-enabled table based on the provided
segment names. "
+ + "For each segment provided, it identifies the partition and
deletes all segments "
+ + "with sequence numbers >= the provided segment in that partition. "
+ + "When force flag is true, it bypasses checks for pauseless being
enabled and table being paused. "
+ + "The retention period controls how long deleted segments are
retained before permanent removal. "
+ + "It follows this precedence: input parameter → table config →
cluster setting → 7d default. "
+ + "Use 0d or -1d for immediate deletion without retention.")
+ public SuccessResponse deletePauselessSegments(
+ @ApiParam(value = "Name of the table with type", required = true)
@PathParam("tableNameWithType")
+ String tableNameWithType,
+ @ApiParam(value = "List of segment names. For each segment, all segments
with higher sequence IDs in the same "
+ + "partition will be deleted", required = true, allowMultiple = true)
+ @QueryParam("segments") List<String> segments,
+ @ApiParam(value = "Force flag to bypass checks for pauseless being
enabled and table being paused",
+ defaultValue = "false") @QueryParam("force") boolean force,
+ @Context HttpHeaders headers
+ ) {
+
+ tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
+
+
Preconditions.checkState(TableNameBuilder.isRealtimeTableResource(tableNameWithType),
+ "Table should be a realtime table.");
+
+ // Validate input segments
+ if (segments == null || segments.isEmpty()) {
+ throw new ControllerApplicationException(LOGGER, "Segment list must not
be empty", Status.BAD_REQUEST);
+ }
+
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+
+ if (!force) {
+ // Check if pauseless is enabled
+
Preconditions.checkState(PauselessConsumptionUtils.isPauselessEnabled(tableConfig),
+ "Pauseless is not enabled for the table " + tableNameWithType);
+ // Check if the ingestion has been paused
+
Preconditions.checkState(_pinotHelixResourceManager.getRealtimeSegmentManager()
+ .getPauseStatusDetails(tableNameWithType)
+ .getPauseFlag(), "Table " + tableNameWithType + " should be paused
before deleting segments.");
+ }
+
+ IdealState idealState =
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Ideal State does not exist
for table " + tableNameWithType);
+
+ Map<Integer, LLCSegmentName> partitionToOldestSegment =
getPartitionIDToOldestSegment(segments);
+ Map<Integer, LLCSegmentName> partitionIdToLatestSegment = new HashMap<>();
+ Map<Integer, Set<String>> partitionIdToSegmentsToDeleteMap =
+ getPartitionIdToSegmentsToDeleteMap(partitionToOldestSegment,
idealState, partitionIdToLatestSegment);
+ for (Integer partitionID : partitionToOldestSegment.keySet()) {
+ Set<String> segmentToDeleteForPartition =
partitionIdToSegmentsToDeleteMap.get(partitionID);
+ LOGGER.info("Deleting : {} segments from segment: {} to segment: {} for
partition: {}",
+ segmentToDeleteForPartition.size(),
partitionToOldestSegment.get(partitionID),
+ partitionIdToLatestSegment.get(partitionID), partitionID);
+ deleteSegmentsInternal(tableNameWithType, new
ArrayList<>(segmentToDeleteForPartition), null);
+ }
+
+ return new SuccessResponse("Successfully deleted segments for table: " +
tableNameWithType);
+ }
+
+ /**
+ * Identifies segments that need to be deleted based on partition and
sequence ID information.
+ *
+ * For each partition in the provided partitionToOldestSegment map, this
method identifies
+ * all segments with sequence IDs greater than or equal to the oldest
segment's sequence ID.
+ * It also tracks the latest segment (highest sequence ID) for each
partition, which is useful
+ * for logging purposes.
+ *
+ * @param partitionToOldestSegment Map of partition IDs to their
corresponding oldest segment (lowest sequence ID)
+ * that serves as the threshold for deletion.
All segments with sequence IDs
+ * greater than or equal to this will be
selected for deletion.
+ * @param idealState The table's ideal state which contains information
about all existing segments.
+ * @param partitionIdToLatestSegment A map that will be populated with the
latest segment (highest sequence ID)
+ * for each partition. This is passed by
reference and modified by this method.
+ *
+ * @return A map from partition IDs to sets of segment names that should be
deleted.
+ * Each set contains all segments with sequence IDs >= the oldest
segment's sequence ID
+ * for that particular partition.
+ */
+ @VisibleForTesting
+ Map<Integer, Set<String>> getPartitionIdToSegmentsToDeleteMap(
+ Map<Integer, LLCSegmentName> partitionToOldestSegment,
+ IdealState idealState, Map<Integer, LLCSegmentName>
partitionIdToLatestSegment) {
+
+ // Find segments to delete (those with higher sequence numbers)
+ Map<Integer, Set<String>> partitionToSegmentsToDelete = new HashMap<>();
+ Map<String, Map<String, String>> segmentsToInstanceState =
idealState.getRecord().getMapFields();
+
+ for (String segmentName : segmentsToInstanceState.keySet()) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ int partitionId = llcSegmentName.getPartitionGroupId();
+
+ LLCSegmentName oldestSegment = partitionToOldestSegment.get(partitionId);
+ if (oldestSegment != null && oldestSegment.getSequenceNumber() <=
llcSegmentName.getSequenceNumber()) {
+ partitionToSegmentsToDelete
+ .computeIfAbsent(partitionId, k -> new HashSet<>())
+ .add(segmentName);
+ }
+
+ // Track latest segment (segment with highest sequence ID)
+ LLCSegmentName currentLatest =
partitionIdToLatestSegment.get(partitionId);
+ if (currentLatest == null || llcSegmentName.getSequenceNumber() >
currentLatest.getSequenceNumber()) {
+ partitionIdToLatestSegment.put(partitionId, llcSegmentName);
+ }
+ }
+
+ return partitionToSegmentsToDelete;
+ }
+
+ @VisibleForTesting
+ Map<Integer, LLCSegmentName> getPartitionIDToOldestSegment(List<String>
segments) {
+ Map<Integer, LLCSegmentName> partitionToOldestSegment = new HashMap<>();
+
+ for (String segment : segments) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
Review Comment:
`Also a minor same table name check`
Done. Thanks for pointing this out.
I have updated the UT's to reflect to this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]