noob-se7en commented on code in PR #18285:
URL: https://github.com/apache/pinot/pull/18285#discussion_r3136327949
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -451,4 +466,111 @@ public static ValidDocIdsType
getValidDocIdsType(UpsertConfig upsertConfig, Map<
}
return validDocIdsType;
}
+
+ /**
+ * Filters out segments that are past (or near) the table's retention
period. This prevents task generators from
+ * selecting segments that RetentionManager may delete before the task
executor downloads them.
+ * <p>
+ * Uses the same retention logic as {@code TimeRetentionStrategy}: a segment
is considered expired if
+ * {@code currentTimeMs - endTimeMs > effectiveRetentionMs}, where
effectiveRetentionMs is
+ * {@code retentionMs - bufferMs}.
+ * <p>
+ * If {@link #RETENTION_EXPIRY_BUFFER_PERIOD_KEY} is set in {@code
taskConfigs}, the effective retention is reduced
+ * by that amount, excluding segments earlier — before RetentionManager
actually deletes them. This is a table-level
+ * config, not a per-merge-level config, because retention itself is
table-level.
+ * <p>
+ * <b>Note on hybrid tables:</b> This method reads only the table-level
retention config
+ * ({@code segmentsConfig.retentionTimeUnit/Value}). It does not account for
hybrid retention strategies that use
+ * the offline table's time boundary. If hybrid retention is enabled (off by
default), RetentionManager may use a
+ * different deletion boundary than what this method computes, so the filter
may not perfectly match the controller's
+ * deletion decisions for hybrid tables.
+ * <p>
+ * <b>Watermark impact (MergeRollupTask):</b> This filter runs before
watermark advancement. If all segments in an
+ * early time bucket are filtered out, the watermark will advance past them
permanently. This is a one-way door but
+ * is expected: those segments would be purged by RetentionManager
regardless. If this is caused by a misconfigured
+ * {@code retentionExpiryBufferPeriod}, correcting the config will not
recover already-skipped buckets.
+ *
+ * @apiNote Callers are expected to pass only completed segments (status
DONE or UPLOADED). This method does not
+ * check segment status, unlike {@code TimeRetentionStrategy.isPurgeable()}
which skips incomplete segments. All
+ * current callers ({@code
UpsertCompactMergeTaskGenerator.getCandidateSegments},
+ * {@code
MergeRollupTaskGenerator.getNonConsumingSegmentsZKMetadataForRealtimeTable})
already guarantee this.
+ *
+ * @param segments the candidate segments to filter (must not be null)
+ * @param tableConfig the table config containing retention settings
+ * @param taskConfigs task-level configs; may contain {@link
#RETENTION_EXPIRY_BUFFER_PERIOD_KEY}. Null if
+ * unavailable.
+ * @param currentTimeMs the current time in milliseconds (pass {@code
System.currentTimeMillis()})
+ * @return filtered list excluding segments past effective retention;
returns the original list if retention is not
+ * configured or cannot be parsed
+ */
+ public static List<SegmentZKMetadata>
filterSegmentsPastRetention(List<SegmentZKMetadata> segments,
+ TableConfig tableConfig, @Nullable Map<String, String> taskConfigs, long
currentTimeMs) {
+ SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
+ String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
+ String retentionTimeValue = validationConfig.getRetentionTimeValue();
+ if (retentionTimeUnit == null || retentionTimeValue == null) {
+ return segments;
+ }
+
+ long retentionMs;
+ try {
+ retentionMs =
TimeUnit.valueOf(retentionTimeUnit.toUpperCase()).toMillis(Long.parseLong(retentionTimeValue));
+ } catch (Exception e) {
+ LOGGER.warn("Failed to parse retention config for table: {}, skipping
retention filter",
+ tableConfig.getTableName(), e);
+ return segments;
+ }
+
+ if (retentionMs <= 0) {
+ LOGGER.warn("Retention is non-positive ({}ms) for table: {}, skipping
retention filter",
+ retentionMs, tableConfig.getTableName());
+ return segments;
+ }
+
+ long bufferMs = 0;
+ if (taskConfigs != null) {
+ String bufferPeriod =
taskConfigs.get(RETENTION_EXPIRY_BUFFER_PERIOD_KEY);
+ if (bufferPeriod != null) {
+ try {
+ bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
+ if (bufferMs < 0) {
+ LOGGER.warn("Invalid retentionExpiryBufferPeriod '{}' for table:
{}, using 0",
+ bufferPeriod, tableConfig.getTableName());
+ bufferMs = 0;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to parse retentionExpiryBufferPeriod '{}' for
table: {}, using 0",
+ bufferPeriod, tableConfig.getTableName(), e);
+ }
+ }
+ }
+
+ long effectiveRetentionMs = retentionMs - bufferMs;
+ if (effectiveRetentionMs <= 0) {
+ LOGGER.warn("retentionExpiryBufferPeriod ({}) >= retention ({}ms) for
table: {}, skipping retention filter",
+ bufferMs, retentionMs, tableConfig.getTableName());
+ return segments;
+ }
+
+ String tableNameWithType = tableConfig.getTableName();
+ List<SegmentZKMetadata> filtered = new ArrayList<>();
+ int excludedCount = 0;
+ for (SegmentZKMetadata segment : segments) {
+ long endTimeMs = segment.getEndTimeMs();
+ if (!TimeUtils.timeValueInValidRange(endTimeMs)) {
Review Comment:
We can have 1 util method: `isPurgeable(String tableNameWithType,
SegmentZKMetadata segmentZKMetadata) `. The original method in
TimeRetentionStrategy with same signature can reference the new method in utils
and implementation of this method can be same as the original one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]