jineshparakh commented on code in PR #18285:
URL: https://github.com/apache/pinot/pull/18285#discussion_r3136023670


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -451,4 +466,111 @@ public static ValidDocIdsType 
getValidDocIdsType(UpsertConfig upsertConfig, Map<
     }
     return validDocIdsType;
   }
+
+  /**
+   * Filters out segments that are past (or near) the table's retention 
period. This prevents task generators from
+   * selecting segments that RetentionManager may delete before the task 
executor downloads them.
+   * <p>
+   * Uses the same retention logic as {@code TimeRetentionStrategy}: a segment 
is considered expired if
+   * {@code currentTimeMs - endTimeMs > effectiveRetentionMs}, where 
effectiveRetentionMs is
+   * {@code retentionMs - bufferMs}.
+   * <p>
+   * If {@link #RETENTION_EXPIRY_BUFFER_PERIOD_KEY} is set in {@code 
taskConfigs}, the effective retention is reduced
+   * by that amount, excluding segments earlier — before RetentionManager 
actually deletes them. This is a table-level
+   * config, not a per-merge-level config, because retention itself is 
table-level.
+   * <p>
+   * <b>Note on hybrid tables:</b> This method reads only the table-level 
retention config
+   * ({@code segmentsConfig.retentionTimeUnit/Value}). It does not account for 
hybrid retention strategies that use
+   * the offline table's time boundary. If hybrid retention is enabled (off by 
default), RetentionManager may use a
+   * different deletion boundary than what this method computes, so the filter 
may not perfectly match the controller's
+   * deletion decisions for hybrid tables.
+   * <p>
+   * <b>Watermark impact (MergeRollupTask):</b> This filter runs before 
watermark advancement. If all segments in an
+   * early time bucket are filtered out, the watermark will advance past them 
permanently. This is a one-way door but
+   * is expected: those segments would be purged by RetentionManager 
regardless. If this is caused by a misconfigured
+   * {@code retentionExpiryBufferPeriod}, correcting the config will not 
recover already-skipped buckets.
+   *
+   * @apiNote Callers are expected to pass only completed segments (status 
DONE or UPLOADED). This method does not
+   * check segment status, unlike {@code TimeRetentionStrategy.isPurgeable()} 
which skips incomplete segments. All
+   * current callers ({@code 
UpsertCompactMergeTaskGenerator.getCandidateSegments},
+   * {@code 
MergeRollupTaskGenerator.getNonConsumingSegmentsZKMetadataForRealtimeTable}) 
already guarantee this.
+   *
+   * @param segments      the candidate segments to filter (must not be null)
+   * @param tableConfig   the table config containing retention settings
+   * @param taskConfigs   task-level configs; may contain {@link 
#RETENTION_EXPIRY_BUFFER_PERIOD_KEY}. Null if
+   *                      unavailable.
+   * @param currentTimeMs the current time in milliseconds (pass {@code 
System.currentTimeMillis()})
+   * @return filtered list excluding segments past effective retention; 
returns the original list if retention is not
+   *         configured or cannot be parsed
+   */
+  public static List<SegmentZKMetadata> 
filterSegmentsPastRetention(List<SegmentZKMetadata> segments,
+      TableConfig tableConfig, @Nullable Map<String, String> taskConfigs, long 
currentTimeMs) {
+    SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
+    String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
+    String retentionTimeValue = validationConfig.getRetentionTimeValue();
+    if (retentionTimeUnit == null || retentionTimeValue == null) {
+      return segments;
+    }
+
+    long retentionMs;
+    try {
+      retentionMs = 
TimeUnit.valueOf(retentionTimeUnit.toUpperCase()).toMillis(Long.parseLong(retentionTimeValue));
+    } catch (Exception e) {
+      LOGGER.warn("Failed to parse retention config for table: {}, skipping 
retention filter",
+          tableConfig.getTableName(), e);
+      return segments;
+    }
+
+    if (retentionMs <= 0) {
+      LOGGER.warn("Retention is non-positive ({}ms) for table: {}, skipping 
retention filter",
+          retentionMs, tableConfig.getTableName());
+      return segments;
+    }
+
+    long bufferMs = 0;
+    if (taskConfigs != null) {
+      String bufferPeriod = 
taskConfigs.get(RETENTION_EXPIRY_BUFFER_PERIOD_KEY);
+      if (bufferPeriod != null) {
+        try {
+          bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
+          if (bufferMs < 0) {
+            LOGGER.warn("Invalid retentionExpiryBufferPeriod '{}' for table: 
{}, using 0",
+                bufferPeriod, tableConfig.getTableName());
+            bufferMs = 0;
+          }
+        } catch (Exception e) {
+          LOGGER.warn("Failed to parse retentionExpiryBufferPeriod '{}' for 
table: {}, using 0",
+              bufferPeriod, tableConfig.getTableName(), e);
+        }
+      }
+    }
+
+    long effectiveRetentionMs = retentionMs - bufferMs;
+    if (effectiveRetentionMs <= 0) {
+      LOGGER.warn("retentionExpiryBufferPeriod ({}) >= retention ({}ms) for 
table: {}, skipping retention filter",
+          bufferMs, retentionMs, tableConfig.getTableName());
+      return segments;
+    }
+
+    String tableNameWithType = tableConfig.getTableName();
+    List<SegmentZKMetadata> filtered = new ArrayList<>();
+    int excludedCount = 0;
+    for (SegmentZKMetadata segment : segments) {
+      long endTimeMs = segment.getEndTimeMs();
+      if (!TimeUtils.timeValueInValidRange(endTimeMs)) {

Review Comment:
   For null retention,
   Agreed. It's already handled by the null-guard early return at the top of 
the method; those tables pass through all segments unchanged.
   
   For extracting isPurgable,
   I looked at this. The two methods answer different questions with 
intentionally divergent policies around invalid endTimes: isPurgeable returns 
false (don't purge) for  invalid endTimes, while this filter passes them 
through (don't exclude from task generation,  because if RetentionManager won't 
delete the segment, there's no race to guard against. There's also no 
creationTime fallback here, and no status check (callers already guarantee 
completed segments). Fully unifying them would require parameterizing all these 
differences into a shared API that's harder to understand than the two separate 
methods.  
   
   Do you still want me to make the changes in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to