noob-se7en commented on code in PR #14623:
URL: https://github.com/apache/pinot/pull/14623#discussion_r1925925316
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -147,97 +157,385 @@ public List<PinotTaskConfig>
generateTasks(List<TableConfig> tableConfigs) {
long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
- // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode.
WindowStart = watermark. WindowEnd =
- // windowStart + bucket.
- long windowStartMs = getWatermarkMs(realtimeTableName,
completedSegmentsZKMetadata, bucketMs);
- long windowEndMs = windowStartMs + bucketMs;
+ ZNRecord realtimeToOfflineZNRecord =
+
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+ realtimeTableName);
+ int expectedVersion = realtimeToOfflineZNRecord != null ?
realtimeToOfflineZNRecord.getVersion() : -1;
+ RealtimeToOfflineSegmentsTaskMetadata
realtimeToOfflineSegmentsTaskMetadata =
+ getRTOTaskMetadata(realtimeTableName,
completedRealtimeSegmentsZKMetadata, bucketMs,
+ realtimeToOfflineZNRecord);
+
+ // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode.
WindowStart = watermark.
+ long windowStartMs =
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
// Find all COMPLETED segments with data overlapping execution window:
windowStart (inclusive) to windowEnd
// (exclusive)
- List<String> segmentNames = new ArrayList<>();
- List<String> downloadURLs = new ArrayList<>();
Set<String> lastLLCSegmentPerPartition = new
HashSet<>(partitionToLatestLLCSegmentName.values());
- boolean skipGenerate = false;
- while (true) {
- // Check that execution window is older than bufferTime
- if (windowEndMs > System.currentTimeMillis() - bufferMs) {
- LOGGER.info(
- "Window with start: {} and end: {} is not older than buffer
time: {} configured as {} ago. Skipping task "
- + "generation: {}", windowStartMs, windowEndMs, bufferMs,
bufferTimePeriod, taskType);
- skipGenerate = true;
- break;
+
+ // Get all offline table segments.
+ // These are used to validate if previous minion task was successful or
not
+ String offlineTableName =
+
TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName));
+ Set<String> existingOfflineTableSegmentNames =
+ new
HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName,
true));
+
+ // In-case of previous minion task failures, get info
+ // of failed minion subtasks. They need to be reprocessed.
+ Set<String> failedTaskInputSegments =
+ getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata,
existingOfflineTableSegmentNames);
+
+ // In-case of partial failure of segments upload in prev minion task run,
+ // data is inconsistent, delete the corresponding offline segments
immediately.
+ if (!failedTaskInputSegments.isEmpty()) {
+ deleteInvalidOfflineSegments(offlineTableName,
failedTaskInputSegments, existingOfflineTableSegmentNames,
+ realtimeToOfflineSegmentsTaskMetadata);
+ }
+
+ List<SegmentZKMetadata> segmentsToBeReProcessed =
+ filterOutRemovedSegments(failedTaskInputSegments,
completedRealtimeSegmentsZKMetadata);
+
+ // if no segment to be reprocessed, no failure
+ boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty();
+
+ List<List<String>> segmentNamesGroupList = new ArrayList<>();
+ Map<String, String> segmentNameVsDownloadURL = new HashMap<>();
+
+ // maxNumRecordsPerTask is used to divide a minion tasks among
+ // multiple subtasks to improve performance.
+ int maxNumRecordsPerTask =
+
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)
!= null
+ ? Integer.parseInt(
+
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY))
+ : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
+
+ List<SegmentZKMetadata> segmentsToBeScheduled;
+
+ if (!prevMinionTaskSuccessful) {
+ segmentsToBeScheduled = segmentsToBeReProcessed;
+ } else {
+ // if all offline segments of prev minion tasks were successfully
uploaded,
+ // we can clear the state of prev minion tasks as now it's useless.
+ if
(!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().
+ isEmpty()) {
+
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear();
Review Comment:
Yes.
Since we reach here only when no prev minon failures are found, In the next
generator run we will evaluate same and will reach here again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]