noob-se7en commented on code in PR #18285:
URL: https://github.com/apache/pinot/pull/18285#discussion_r3136327949


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -451,4 +466,111 @@ public static ValidDocIdsType 
getValidDocIdsType(UpsertConfig upsertConfig, Map<
     }
     return validDocIdsType;
   }
+
+  /**
+   * Filters out segments that are past (or near) the table's retention 
period. This prevents task generators from
+   * selecting segments that RetentionManager may delete before the task 
executor downloads them.
+   * <p>
+   * Uses the same retention logic as {@code TimeRetentionStrategy}: a segment 
is considered expired if
+   * {@code currentTimeMs - endTimeMs > effectiveRetentionMs}, where 
effectiveRetentionMs is
+   * {@code retentionMs - bufferMs}.
+   * <p>
+   * If {@link #RETENTION_EXPIRY_BUFFER_PERIOD_KEY} is set in {@code 
taskConfigs}, the effective retention is reduced
+   * by that amount, excluding segments earlier — before RetentionManager 
actually deletes them. This is a table-level
+   * config, not a per-merge-level config, because retention itself is 
table-level.
+   * <p>
+   * <b>Note on hybrid tables:</b> This method reads only the table-level 
retention config
+   * ({@code segmentsConfig.retentionTimeUnit/Value}). It does not account for 
hybrid retention strategies that use
+   * the offline table's time boundary. If hybrid retention is enabled (off by 
default), RetentionManager may use a
+   * different deletion boundary than what this method computes, so the filter 
may not perfectly match the controller's
+   * deletion decisions for hybrid tables.
+   * <p>
+   * <b>Watermark impact (MergeRollupTask):</b> This filter runs before 
watermark advancement. If all segments in an
+   * early time bucket are filtered out, the watermark will advance past them 
permanently. This is a one-way door but
+   * is expected: those segments would be purged by RetentionManager 
regardless. If this is caused by a misconfigured
+   * {@code retentionExpiryBufferPeriod}, correcting the config will not 
recover already-skipped buckets.
+   *
+   * @apiNote Callers are expected to pass only completed segments (status 
DONE or UPLOADED). This method does not
+   * check segment status, unlike {@code TimeRetentionStrategy.isPurgeable()} 
which skips incomplete segments. All
+   * current callers ({@code 
UpsertCompactMergeTaskGenerator.getCandidateSegments},
+   * {@code 
MergeRollupTaskGenerator.getNonConsumingSegmentsZKMetadataForRealtimeTable}) 
already guarantee this.
+   *
+   * @param segments      the candidate segments to filter (must not be null)
+   * @param tableConfig   the table config containing retention settings
+   * @param taskConfigs   task-level configs; may contain {@link 
#RETENTION_EXPIRY_BUFFER_PERIOD_KEY}. Null if
+   *                      unavailable.
+   * @param currentTimeMs the current time in milliseconds (pass {@code 
System.currentTimeMillis()})
+   * @return filtered list excluding segments past effective retention; 
returns the original list if retention is not
+   *         configured or cannot be parsed
+   */
+  public static List<SegmentZKMetadata> 
filterSegmentsPastRetention(List<SegmentZKMetadata> segments,
+      TableConfig tableConfig, @Nullable Map<String, String> taskConfigs, long 
currentTimeMs) {
+    SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
+    String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
+    String retentionTimeValue = validationConfig.getRetentionTimeValue();
+    if (retentionTimeUnit == null || retentionTimeValue == null) {
+      return segments;
+    }
+
+    long retentionMs;
+    try {
+      retentionMs = 
TimeUnit.valueOf(retentionTimeUnit.toUpperCase()).toMillis(Long.parseLong(retentionTimeValue));
+    } catch (Exception e) {
+      LOGGER.warn("Failed to parse retention config for table: {}, skipping 
retention filter",
+          tableConfig.getTableName(), e);
+      return segments;
+    }
+
+    if (retentionMs <= 0) {
+      LOGGER.warn("Retention is non-positive ({}ms) for table: {}, skipping 
retention filter",
+          retentionMs, tableConfig.getTableName());
+      return segments;
+    }
+
+    long bufferMs = 0;
+    if (taskConfigs != null) {
+      String bufferPeriod = 
taskConfigs.get(RETENTION_EXPIRY_BUFFER_PERIOD_KEY);
+      if (bufferPeriod != null) {
+        try {
+          bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
+          if (bufferMs < 0) {
+            LOGGER.warn("Invalid retentionExpiryBufferPeriod '{}' for table: 
{}, using 0",
+                bufferPeriod, tableConfig.getTableName());
+            bufferMs = 0;
+          }
+        } catch (Exception e) {
+          LOGGER.warn("Failed to parse retentionExpiryBufferPeriod '{}' for 
table: {}, using 0",
+              bufferPeriod, tableConfig.getTableName(), e);
+        }
+      }
+    }
+
+    long effectiveRetentionMs = retentionMs - bufferMs;
+    if (effectiveRetentionMs <= 0) {
+      LOGGER.warn("retentionExpiryBufferPeriod ({}) >= retention ({}ms) for 
table: {}, skipping retention filter",
+          bufferMs, retentionMs, tableConfig.getTableName());
+      return segments;
+    }
+
+    String tableNameWithType = tableConfig.getTableName();
+    List<SegmentZKMetadata> filtered = new ArrayList<>();
+    int excludedCount = 0;
+    for (SegmentZKMetadata segment : segments) {
+      long endTimeMs = segment.getEndTimeMs();
+      if (!TimeUtils.timeValueInValidRange(endTimeMs)) {

Review Comment:
   We can have 1 util method: `isPurgeable(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata) `. The original method in 
TimeRetentionStrategy with same signature can reference the new method in utils 
and implementation of this method can be same as the original one.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to