jineshparakh commented on code in PR #18285:
URL: https://github.com/apache/pinot/pull/18285#discussion_r3136023670
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -451,4 +466,111 @@ public static ValidDocIdsType
getValidDocIdsType(UpsertConfig upsertConfig, Map<
}
return validDocIdsType;
}
+
+ /**
+ * Filters out segments that are past (or near) the table's retention
period. This prevents task generators from
+ * selecting segments that RetentionManager may delete before the task
executor downloads them.
+ * <p>
+ * Uses the same retention logic as {@code TimeRetentionStrategy}: a segment
is considered expired if
+ * {@code currentTimeMs - endTimeMs > effectiveRetentionMs}, where
effectiveRetentionMs is
+ * {@code retentionMs - bufferMs}.
+ * <p>
+ * If {@link #RETENTION_EXPIRY_BUFFER_PERIOD_KEY} is set in {@code
taskConfigs}, the effective retention is reduced
+ * by that amount, excluding segments earlier — before RetentionManager
actually deletes them. This is a table-level
+ * config, not a per-merge-level config, because retention itself is
table-level.
+ * <p>
+ * <b>Note on hybrid tables:</b> This method reads only the table-level
retention config
+ * ({@code segmentsConfig.retentionTimeUnit/Value}). It does not account for
hybrid retention strategies that use
+ * the offline table's time boundary. If hybrid retention is enabled (off by
default), RetentionManager may use a
+ * different deletion boundary than what this method computes, so the filter
may not perfectly match the controller's
+ * deletion decisions for hybrid tables.
+ * <p>
+ * <b>Watermark impact (MergeRollupTask):</b> This filter runs before
watermark advancement. If all segments in an
+ * early time bucket are filtered out, the watermark will advance past them
permanently. This is a one-way door but
+ * is expected: those segments would be purged by RetentionManager
regardless. If this is caused by a misconfigured
+ * {@code retentionExpiryBufferPeriod}, correcting the config will not
recover already-skipped buckets.
+ *
+ * @apiNote Callers are expected to pass only completed segments (status
DONE or UPLOADED). This method does not
+ * check segment status, unlike {@code TimeRetentionStrategy.isPurgeable()}
which skips incomplete segments. All
+ * current callers ({@code
UpsertCompactMergeTaskGenerator.getCandidateSegments},
+ * {@code
MergeRollupTaskGenerator.getNonConsumingSegmentsZKMetadataForRealtimeTable})
already guarantee this.
+ *
+ * @param segments the candidate segments to filter (must not be null)
+ * @param tableConfig the table config containing retention settings
+ * @param taskConfigs task-level configs; may contain {@link
#RETENTION_EXPIRY_BUFFER_PERIOD_KEY}. Null if
+ * unavailable.
+ * @param currentTimeMs the current time in milliseconds (pass {@code
System.currentTimeMillis()})
+ * @return filtered list excluding segments past effective retention;
returns the original list if retention is not
+ * configured or cannot be parsed
+ */
+ public static List<SegmentZKMetadata>
filterSegmentsPastRetention(List<SegmentZKMetadata> segments,
+ TableConfig tableConfig, @Nullable Map<String, String> taskConfigs, long
currentTimeMs) {
+ SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
+ String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
+ String retentionTimeValue = validationConfig.getRetentionTimeValue();
+ if (retentionTimeUnit == null || retentionTimeValue == null) {
+ return segments;
+ }
+
+ long retentionMs;
+ try {
+ retentionMs =
TimeUnit.valueOf(retentionTimeUnit.toUpperCase()).toMillis(Long.parseLong(retentionTimeValue));
+ } catch (Exception e) {
+ LOGGER.warn("Failed to parse retention config for table: {}, skipping
retention filter",
+ tableConfig.getTableName(), e);
+ return segments;
+ }
+
+ if (retentionMs <= 0) {
+ LOGGER.warn("Retention is non-positive ({}ms) for table: {}, skipping
retention filter",
+ retentionMs, tableConfig.getTableName());
+ return segments;
+ }
+
+ long bufferMs = 0;
+ if (taskConfigs != null) {
+ String bufferPeriod =
taskConfigs.get(RETENTION_EXPIRY_BUFFER_PERIOD_KEY);
+ if (bufferPeriod != null) {
+ try {
+ bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
+ if (bufferMs < 0) {
+ LOGGER.warn("Invalid retentionExpiryBufferPeriod '{}' for table:
{}, using 0",
+ bufferPeriod, tableConfig.getTableName());
+ bufferMs = 0;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to parse retentionExpiryBufferPeriod '{}' for
table: {}, using 0",
+ bufferPeriod, tableConfig.getTableName(), e);
+ }
+ }
+ }
+
+ long effectiveRetentionMs = retentionMs - bufferMs;
+ if (effectiveRetentionMs <= 0) {
+ LOGGER.warn("retentionExpiryBufferPeriod ({}) >= retention ({}ms) for
table: {}, skipping retention filter",
+ bufferMs, retentionMs, tableConfig.getTableName());
+ return segments;
+ }
+
+ String tableNameWithType = tableConfig.getTableName();
+ List<SegmentZKMetadata> filtered = new ArrayList<>();
+ int excludedCount = 0;
+ for (SegmentZKMetadata segment : segments) {
+ long endTimeMs = segment.getEndTimeMs();
+ if (!TimeUtils.timeValueInValidRange(endTimeMs)) {
Review Comment:
For null retention,
Agreed. It's already handled by the null-guard early return at the top of
the method; those tables pass through all segments unchanged.
For extracting isPurgable,
I looked at this. The two methods answer different questions with
intentionally divergent policies around invalid endTimes: isPurgeable returns
false (don't purge) for invalid endTimes, while this filter passes them
through (don't exclude from task generation, because if RetentionManager won't
delete the segment, there's no race to guard against. There's also no
creationTime fallback here, and no status check (callers already guarantee
completed segments). Fully unifying them would require parameterizing all these
differences into a shared API that's harder to understand than the two separate
methods.
Do you still want me to make the changes in this PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]