This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 127f4c3  Add select segments API (#7651)
127f4c3 is described below

commit 127f4c3b6966686ae984ba47dbd01a4432184baf
Author: Jialiang Li <[email protected]>
AuthorDate: Thu Oct 28 11:17:55 2021 -0700

    Add select segments API (#7651)
    
    Co-authored-by: Jack Li(Analytics Engineering) <[email protected]>
---
 .../api/resources/PinotSegmentRestletResource.java | 41 +++++++++++++
 .../helix/core/PinotHelixResourceManager.java      | 68 ++++++++++++++++++++++
 2 files changed, 109 insertions(+)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 2628f01..ba569d5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.api.resources;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -657,6 +658,46 @@ public class PinotSegmentRestletResource {
     return segmentsMetadata;
   }
 
+  @GET
+  @Path("segments/{tableName}/select")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get the selected segments given the (inclusive) start 
and (exclusive) end timestamps"
+      + " in milliseconds. These timestamps will be compared against the 
minmax values of the time column in each"
+      + " segment. If the table is a refresh use case, the value of start and 
end timestamp is voided,"
+      + " since there is no time column for refresh use case; instead, the 
whole qualified segments will be returned."
+      + " If no timestamps are provided, all the qualified segments will be 
returned."
+      + " For the segments that partially belong to the time range, the 
boolean flag 'excludeOverlapping' is introduced"
+      + " in order for user to determine whether to exclude this kind of 
segments in the response.",
+      notes = "Get the selected segments given the start and end timestamps in 
milliseconds")
+  public List<Map<TableType, List<String>>> getSelectedSegments(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
+      @ApiParam(value = "Start timestamp (inclusive)") 
@QueryParam("startTimestamp") @DefaultValue("")
+          String startTimestampStr,
+      @ApiParam(value = "End timestamp (exclusive)") 
@QueryParam("endTimestamp") @DefaultValue("")
+          String endTimestampStr,
+      @ApiParam(value = "Whether to exclude the segments overlapping with the 
timestamps, false by default")
+      @QueryParam("excludeOverlapping") @DefaultValue("false") boolean 
excludeOverlapping) {
+    long startTimestamp = Strings.isNullOrEmpty(startTimestampStr) ? 
Long.MIN_VALUE : Long.parseLong(startTimestampStr);
+    long endTimestamp = Strings.isNullOrEmpty(endTimestampStr) ? 
Long.MAX_VALUE : Long.parseLong(endTimestampStr);
+    Preconditions.checkArgument(startTimestamp < endTimestamp,
+        "The value of startTimestamp should be smaller than the one of 
endTimestamp. Start timestamp: %d. End "
+            + "timestamp: %d",
+        startTimestamp, endTimestamp);
+
+    List<String> tableNamesWithType = ResourceUtils
+        .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, 
Constants.validateTableType(tableTypeStr),
+            LOGGER);
+    List<Map<TableType, List<String>>> resultList = new 
ArrayList<>(tableNamesWithType.size());
+    for (String tableNameWithType : tableNamesWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      List<String> segments = _pinotHelixResourceManager
+          .getSegmentsForTableWithTimestamps(tableNameWithType, 
startTimestamp, endTimestamp, excludeOverlapping);
+      resultList.add(Collections.singletonMap(tableType, segments));
+    }
+    return resultList;
+  }
+
   /**
    * This is a helper method to get the metadata for all segments for a given 
table name.
    * @param tableNameWithType name of the table along with its type
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8686671..189c02c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -593,6 +593,74 @@ public class PinotHelixResourceManager {
     return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
   }
 
+  /**
+   * Returns the segments for the given table based on the start and end 
timestamp.
+   *
+   * @param tableNameWithType  Table name with type suffix
+   * @param startTimestamp  start timestamp in milliseconds (inclusive)
+   * @param endTimestamp  end timestamp in milliseconds (exclusive)
+   * @param excludeOverlapping  whether to exclude the segments overlapping 
with the timestamps
+   */
+  public List<String> getSegmentsForTableWithTimestamps(String 
tableNameWithType, long startTimestamp,
+      long endTimestamp, boolean excludeOverlapping) {
+    List<String> selectedSegments;
+    // If no start and end timestamp specified, just select all the segments.
+    if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
+      selectedSegments = getSegmentsFor(tableNameWithType);
+    } else {
+      selectedSegments = new ArrayList<>();
+      List<SegmentZKMetadata> segmentZKMetadataList = 
getSegmentsZKMetadata(tableNameWithType);
+      for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+        String segmentName = segmentZKMetadata.getSegmentName();
+        if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, 
endTimestamp, excludeOverlapping)) {
+          selectedSegments.add(segmentName);
+        }
+      }
+    }
+    // Fetch the segment lineage metadata, and filter segments based on 
segment lineage.
+    ZNRecord segmentLineageZNRecord =
+        SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
+    SegmentLineage segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+    Set<String> selectedSegmentSet = new HashSet<>(selectedSegments);
+    
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(selectedSegmentSet, 
segmentLineage);
+    return new ArrayList<>(selectedSegmentSet);
+  }
+
+  /**
+   * Checks whether the segment is within the time range between the start and 
end timestamps.
+   * @param segmentMetadata  the segmentMetadata associated with the segment
+   * @param startTimestamp  start timestamp
+   * @param endTimestamp  end timestamp
+   * @param excludeOverlapping  whether to exclude the segments overlapping 
with the timestamps
+   */
+  private boolean isSegmentWithinTimeStamps(SegmentZKMetadata segmentMetadata, 
long startTimestamp,
+      long endTimestamp, boolean excludeOverlapping) {
+    if (segmentMetadata == null) {
+      return false;
+    }
+    long startTimeMsInSegment = segmentMetadata.getStartTimeMs();
+    long endTimeMsInSegment = segmentMetadata.getEndTimeMs();
+    if (startTimeMsInSegment == -1 && endTimeMsInSegment == -1) {
+      // No time column specified in the metadata and no minmax value either.
+      return true;
+    }
+    if (startTimeMsInSegment > endTimeMsInSegment) {
+      LOGGER.warn("Invalid start and end time for segment: {}. Start time: {}. 
End time: {}",
+          segmentMetadata.getSegmentName(), startTimeMsInSegment, 
endTimeMsInSegment);
+      return false;
+    }
+    if (startTimestamp <= startTimeMsInSegment && endTimeMsInSegment < 
endTimestamp) {
+      // The segment is within the start and end time range.
+      return true;
+    } else if (endTimeMsInSegment < startTimestamp || startTimeMsInSegment >= 
endTimestamp) {
+      // The segment is outside of the start and end time range.
+      return false;
+    }
+    // If the segment happens to overlap with the start and end time range,
+    // check the excludeOverlapping flag to determine whether to include the 
segment.
+    return !excludeOverlapping;
+  }
+
   @Nullable
   public SegmentZKMetadata getSegmentZKMetadata(String tableNameWithType, 
String segmentName) {
     return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
tableNameWithType, segmentName);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to