This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new eacd6c058a Allows segments deletion in build for pauseless tables
(#15299)
eacd6c058a is described below
commit eacd6c058a4795541cd1759793734f8b5590e372
Author: 9aman <[email protected]>
AuthorDate: Thu Mar 27 19:43:32 2025 +0530
Allows segments deletion in build for pauseless tables (#15299)
---
.../api/resources/PinotSegmentRestletResource.java | 145 ++++++++++++++++++++-
.../resources/PinotSegmentRestletResourceTest.java | 94 +++++++++++++
2 files changed, 238 insertions(+), 1 deletion(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 600c75b718..a317391766 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -37,6 +37,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -64,6 +65,7 @@ import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.InvalidConfigException;
@@ -75,6 +77,7 @@ import
org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckRespon
import
org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
import org.apache.pinot.controller.ControllerConf;
@@ -891,7 +894,8 @@ public class PinotSegmentRestletResource {
return new SuccessResponse("Deleted " + numSegments + " segments from
table: " + tableName);
}
- private void deleteSegmentsInternal(String tableNameWithType, List<String>
segments, String retentionPeriod) {
+ private void deleteSegmentsInternal(String tableNameWithType, List<String>
segments,
+ @Nullable String retentionPeriod) {
PinotResourceManagerResponse response =
_pinotHelixResourceManager.deleteSegments(tableNameWithType, segments,
retentionPeriod);
if (!response.isSuccessful()) {
@@ -1175,6 +1179,145 @@ public class PinotSegmentRestletResource {
return updateZKTimeIntervalInternal(tableNameWithType);
}
+ @DELETE
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/deletePauselessSegments/{tableName}")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.DELETE_SEGMENT)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation(value = "Delete segments from a pauseless enabled table",
notes =
+ "Deletes segments from a pauseless-enabled table based on the provided
segment names. "
+ + "For each segment provided, it identifies the partition and
deletes all segments "
+ + "with sequence numbers >= the provided segment in that partition. "
+ + "When force flag is true, it bypasses checks for pauseless being
enabled and table being paused. "
+ + "The retention period controls how long deleted segments are
retained before permanent removal. "
+ + "It follows this precedence: input parameter → table config →
cluster setting → 7d default. "
+ + "Use 0d or -1d for immediate deletion without retention.")
+ public SuccessResponse deletePauselessSegments(
+ @ApiParam(value = "Name of the table with type", required = true)
@PathParam("tableNameWithType")
+ String tableNameWithType,
+ @ApiParam(value = "List of segment names. For each segment, all segments
with higher sequence IDs in the same "
+ + "partition will be deleted", required = true, allowMultiple = true)
+ @QueryParam("segments") List<String> segments,
+ @ApiParam(value = "Force flag to bypass checks for pauseless being
enabled and table being paused",
+ defaultValue = "false") @QueryParam("force") boolean force,
+ @Context HttpHeaders headers
+ ) {
+
+ tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
+
+
Preconditions.checkState(TableNameBuilder.isRealtimeTableResource(tableNameWithType),
+ "Table should be a realtime table.");
+
+ // Validate input segments
+ if (segments == null || segments.isEmpty()) {
+ throw new ControllerApplicationException(LOGGER, "Segment list must not
be empty", Status.BAD_REQUEST);
+ }
+
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+
+ if (!force) {
+ // Check if pauseless is enabled
+
Preconditions.checkState(PauselessConsumptionUtils.isPauselessEnabled(tableConfig),
+ "Pauseless is not enabled for the table " + tableNameWithType);
+ // Check if the ingestion has been paused
+
Preconditions.checkState(_pinotHelixResourceManager.getRealtimeSegmentManager()
+ .getPauseStatusDetails(tableNameWithType)
+ .getPauseFlag(), "Table " + tableNameWithType + " should be paused
before deleting segments.");
+ }
+
+ IdealState idealState =
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+ Preconditions.checkState(idealState != null, "Ideal State does not exist
for table " + tableNameWithType);
+
+ Set<String> idealStateSegmentsSet =
idealState.getRecord().getMapFields().keySet();
+ Map<Integer, LLCSegmentName> partitionToOldestSegment =
+ getPartitionIDToOldestSegment(segments, idealStateSegmentsSet);
+ Map<Integer, LLCSegmentName> partitionIdToLatestSegment = new HashMap<>();
+ Map<Integer, Set<String>> partitionIdToSegmentsToDeleteMap =
+ getPartitionIdToSegmentsToDeleteMap(partitionToOldestSegment,
idealStateSegmentsSet,
+ partitionIdToLatestSegment);
+ for (Integer partitionID : partitionToOldestSegment.keySet()) {
+ Set<String> segmentToDeleteForPartition =
partitionIdToSegmentsToDeleteMap.get(partitionID);
+ LOGGER.info("Deleting : {} segments from segment: {} to segment: {} for
partition: {}",
+ segmentToDeleteForPartition.size(),
partitionToOldestSegment.get(partitionID),
+ partitionIdToLatestSegment.get(partitionID), partitionID);
+ deleteSegmentsInternal(tableNameWithType, new
ArrayList<>(segmentToDeleteForPartition), null);
+ }
+
+ return new SuccessResponse("Successfully deleted segments for table: " +
tableNameWithType);
+ }
+
+ /**
+ * Identifies segments that need to be deleted based on partition and
sequence ID information.
+ *
+ * For each partition in the provided partitionToOldestSegment map, this
method identifies
+ * all segments with sequence IDs greater than or equal to the oldest
segment's sequence ID.
+ * It also tracks the latest segment (highest sequence ID) for each
partition, which is useful
+ * for logging purposes.
+ *
+ * @param partitionToOldestSegment Map of partition IDs to their
corresponding oldest segment (lowest sequence ID)
+ * that serves as the threshold for deletion.
All segments with sequence IDs
+ * greater than or equal to this will be
selected for deletion.
+ * @param idealStateSegmentsSet The segments present in the ideal state for
the table
+ * @param partitionIdToLatestSegment A map that will be populated with the
latest segment (highest sequence ID)
+ * for each partition. This is passed by
reference and modified by this method.
+ *
+ * @return A map from partition IDs to sets of segment names that should be
deleted.
+ * Each set contains all segments with sequence IDs >= the oldest
segment's sequence ID
+ * for that particular partition.
+ */
+ @VisibleForTesting
+ Map<Integer, Set<String>> getPartitionIdToSegmentsToDeleteMap(
+ Map<Integer, LLCSegmentName> partitionToOldestSegment,
+ Set<String> idealStateSegmentsSet, Map<Integer, LLCSegmentName>
partitionIdToLatestSegment) {
+
+ // Find segments to delete (those with higher sequence numbers)
+ Map<Integer, Set<String>> partitionToSegmentsToDelete = new HashMap<>();
+
+ for (String segmentName : idealStateSegmentsSet) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ int partitionId = llcSegmentName.getPartitionGroupId();
+
+ LLCSegmentName oldestSegment = partitionToOldestSegment.get(partitionId);
+ if (oldestSegment != null && oldestSegment.getSequenceNumber() <=
llcSegmentName.getSequenceNumber()) {
+ partitionToSegmentsToDelete
+ .computeIfAbsent(partitionId, k -> new HashSet<>())
+ .add(segmentName);
+ }
+
+ // Track latest segment (segment with highest sequence ID)
+ LLCSegmentName currentLatest =
partitionIdToLatestSegment.get(partitionId);
+ if (currentLatest == null || llcSegmentName.getSequenceNumber() >
currentLatest.getSequenceNumber()) {
+ partitionIdToLatestSegment.put(partitionId, llcSegmentName);
+ }
+ }
+
+ return partitionToSegmentsToDelete;
+ }
+
+ @VisibleForTesting
+ Map<Integer, LLCSegmentName> getPartitionIDToOldestSegment(List<String>
segments, Set<String> idealStateSegmentsSet) {
+ Map<Integer, LLCSegmentName> partitionToOldestSegment = new HashMap<>();
+
+ for (String segment : segments) {
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segment);
+ Preconditions.checkState(llcSegmentName != null, "Invalid LLC segment: "
+ segment);
+
+ // ignore segments that are not present in the ideal state
+ if (!idealStateSegmentsSet.contains(segment)) {
+ LOGGER.warn("Segment: {} is not present in the ideal state", segment);
+ continue;
+ }
+ int partitionId = llcSegmentName.getPartitionGroupId();
+
+ LLCSegmentName currentOldest = partitionToOldestSegment.get(partitionId);
+ if (currentOldest == null || llcSegmentName.getSequenceNumber() <
currentOldest.getSequenceNumber()) {
+ partitionToOldestSegment.put(partitionId, llcSegmentName);
+ }
+ }
+
+ return partitionToOldestSegment;
+ }
+
/**
* Internal method to update schema
* @param tableNameWithType name of the table
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
index 392fc05bd8..a54cb3d884 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
@@ -20,9 +20,14 @@ package org.apache.pinot.controller.api.resources;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@@ -30,6 +35,7 @@ import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -85,4 +91,92 @@ public class PinotSegmentRestletResourceTest {
assertTrue(e.getMessage().contains("Only one segment is expected but
got: [seg01, seg02]"));
}
}
+
+ @Test
+ public void testGetPartitionIdToSegmentsToDeleteMap() {
+ IdealState idealState = mock(IdealState.class);
+ ZNRecord znRecord = mock(ZNRecord.class);
+ String tableName = "testTable";
+ long currentTime = System.currentTimeMillis();
+ Map<String, Map<String, String>> segmentsToInstanceState = new HashMap<>();
+
+ // Add segments for partition 0
+ for (String segment : getSegmentForPartition(tableName, 0, 0, 10,
currentTime)) {
+ segmentsToInstanceState.put(segment, null);
+ }
+
+ // Add segments for partition 1
+ for (String segment : getSegmentForPartition(tableName, 1, 0, 10,
currentTime)) {
+ segmentsToInstanceState.put(segment, null);
+ }
+
+ // Mock response for fetching segment to instance state map
+ when(idealState.getRecord()).thenReturn(znRecord);
+ when(znRecord.getMapFields()).thenReturn(segmentsToInstanceState);
+
+ // Create the partition to oldest segment map
+ Map<Integer, LLCSegmentName> partitionToOldestSegment = Map.of(
+ 0, new LLCSegmentName(tableName, 0, 3, currentTime),
+ 1, new LLCSegmentName(tableName, 1, 5, currentTime)
+ );
+
+ // This map will be populated by the method
+ Map<Integer, LLCSegmentName> partitionIdToLatestSegment = new HashMap<>();
+
+ // Create the expected response map
+ Map<Integer, Set<String>> expectedResponse = new HashMap<>();
+ expectedResponse.put(0,
+ getSegmentForPartition(tableName, 0, 3, 7,
currentTime).stream().collect(Collectors.toSet()));
+ expectedResponse.put(1,
+ getSegmentForPartition(tableName, 1, 5, 5,
currentTime).stream().collect(Collectors.toSet()));
+
+ // Call the method and check the result
+ Map<Integer, Set<String>> result =
_pinotSegmentRestletResource.getPartitionIdToSegmentsToDeleteMap(
+ partitionToOldestSegment, segmentsToInstanceState.keySet(),
partitionIdToLatestSegment);
+
+ assertEquals(expectedResponse, result);
+
+ // Verify that partitionIdToLatestSegment has been populated with the
latest segment for each partition
+ assertEquals(2, partitionIdToLatestSegment.size());
+ assertEquals(9, partitionIdToLatestSegment.get(0).getSequenceNumber());
+ assertEquals(9, partitionIdToLatestSegment.get(1).getSequenceNumber());
+ }
+
+ @Test
+ public void testGetPartitionIDToOldestSegment() {
+ List<String> segments = new ArrayList<>();
+ String tableName = "testTable";
+ long currentTime = System.currentTimeMillis();
+
+ // Add segments for testing
+ segments.addAll(getSegmentForPartition(tableName, 0, 3, 3, currentTime));
// Segments with seq 3,4,5 for partition 0
+ segments.addAll(getSegmentForPartition(tableName, 1, 4, 2, currentTime));
// Segments with seq 4,5 for partition 1
+
+ // Only add the above segment to the ideal state segment list
+ Set<String> idealStateSegmentSet = new HashSet<>(segments);
+
+ // Add a segment from another table to this list that has lower sequence
ID for the above partitions
+ segments.addAll(
+ getSegmentForPartition(tableName + "fake", 0, 1, 3, currentTime)); //
Segments with seq 1,2,3 for partition 0
+
+ // Create expected result map
+ Map<Integer, LLCSegmentName> expectedResult = new HashMap<>();
+ expectedResult.put(0, new LLCSegmentName(tableName, 0, 3, currentTime));
+ expectedResult.put(1, new LLCSegmentName(tableName, 1, 4, currentTime));
+
+ // Call the method and check the result
+ Map<Integer, LLCSegmentName> result =
+ _pinotSegmentRestletResource.getPartitionIDToOldestSegment(segments,
idealStateSegmentSet);
+
+ assertEquals(expectedResult, result);
+ }
+
+ private List<String> getSegmentForPartition(String tableName, int
partitionID, int sequenceNumberOffset,
+ int numberOfSegments, long currentTime) {
+ List<String> segments = new ArrayList<>();
+ for (int i = sequenceNumberOffset; i < sequenceNumberOffset +
numberOfSegments; i++) {
+ segments.add(new LLCSegmentName(tableName, partitionID, i,
currentTime).getSegmentName());
+ }
+ return segments;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]