noob-se7en commented on code in PR #14623:
URL: https://github.com/apache/pinot/pull/14623#discussion_r1925930071
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -147,97 +157,385 @@ public List<PinotTaskConfig>
generateTasks(List<TableConfig> tableConfigs) {
long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
- // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode.
WindowStart = watermark. WindowEnd =
- // windowStart + bucket.
- long windowStartMs = getWatermarkMs(realtimeTableName,
completedSegmentsZKMetadata, bucketMs);
- long windowEndMs = windowStartMs + bucketMs;
+ ZNRecord realtimeToOfflineZNRecord =
+
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+ realtimeTableName);
+ int expectedVersion = realtimeToOfflineZNRecord != null ?
realtimeToOfflineZNRecord.getVersion() : -1;
+ RealtimeToOfflineSegmentsTaskMetadata
realtimeToOfflineSegmentsTaskMetadata =
+ getRTOTaskMetadata(realtimeTableName,
completedRealtimeSegmentsZKMetadata, bucketMs,
+ realtimeToOfflineZNRecord);
+
+ // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode.
WindowStart = watermark.
+ long windowStartMs =
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
// Find all COMPLETED segments with data overlapping execution window:
windowStart (inclusive) to windowEnd
// (exclusive)
- List<String> segmentNames = new ArrayList<>();
- List<String> downloadURLs = new ArrayList<>();
Set<String> lastLLCSegmentPerPartition = new
HashSet<>(partitionToLatestLLCSegmentName.values());
- boolean skipGenerate = false;
- while (true) {
- // Check that execution window is older than bufferTime
- if (windowEndMs > System.currentTimeMillis() - bufferMs) {
- LOGGER.info(
- "Window with start: {} and end: {} is not older than buffer
time: {} configured as {} ago. Skipping task "
- + "generation: {}", windowStartMs, windowEndMs, bufferMs,
bufferTimePeriod, taskType);
- skipGenerate = true;
- break;
+
+ // Get all offline table segments.
+ // These are used to validate if previous minion task was successful or
not
+ String offlineTableName =
+
TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(realtimeTableName));
+ Set<String> existingOfflineTableSegmentNames =
+ new
HashSet<>(_clusterInfoAccessor.getPinotHelixResourceManager().getSegmentsFor(offlineTableName,
true));
+
+ // In-case of previous minion task failures, get info
+ // of failed minion subtasks. They need to be reprocessed.
+ Set<String> failedTaskInputSegments =
+ getFailedTaskSegments(realtimeToOfflineSegmentsTaskMetadata,
existingOfflineTableSegmentNames);
+
+ // In-case of partial failure of segments upload in prev minion task run,
+ // data is inconsistent, delete the corresponding offline segments
immediately.
+ if (!failedTaskInputSegments.isEmpty()) {
+ deleteInvalidOfflineSegments(offlineTableName,
failedTaskInputSegments, existingOfflineTableSegmentNames,
+ realtimeToOfflineSegmentsTaskMetadata);
+ }
+
+ List<SegmentZKMetadata> segmentsToBeReProcessed =
+ filterOutRemovedSegments(failedTaskInputSegments,
completedRealtimeSegmentsZKMetadata);
+
+ // if no segment to be reprocessed, no failure
+ boolean prevMinionTaskSuccessful = segmentsToBeReProcessed.isEmpty();
+
+ List<List<String>> segmentNamesGroupList = new ArrayList<>();
+ Map<String, String> segmentNameVsDownloadURL = new HashMap<>();
+
+ // maxNumRecordsPerTask is used to divide a minion tasks among
+ // multiple subtasks to improve performance.
+ int maxNumRecordsPerTask =
+
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY)
!= null
+ ? Integer.parseInt(
+
taskConfigs.get(MinionConstants.RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_TASK_KEY))
+ : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
+
+ List<SegmentZKMetadata> segmentsToBeScheduled;
+
+ if (!prevMinionTaskSuccessful) {
+ segmentsToBeScheduled = segmentsToBeReProcessed;
+ } else {
+ // if all offline segments of prev minion tasks were successfully
uploaded,
+ // we can clear the state of prev minion tasks as now it's useless.
+ if
(!realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().
+ isEmpty()) {
+
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID().clear();
+
realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap().clear();
+ // windowEndTime of prev minion task needs to be re-used for picking
up the
+ // next windowStartTime. This is useful for case where user changes
minion config
+ // after a minion task run was complete. So windowStartTime cannot
be watermark + bucketMs
+ windowStartMs =
realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
}
+ long windowEndMs = windowStartMs + bucketMs;
+ // since window changed, pick new segments.
+ segmentsToBeScheduled =
+ generateNewSegmentsToProcess(completedRealtimeSegmentsZKMetadata,
windowStartMs, windowEndMs, bucketMs,
+ bufferMs, bufferTimePeriod, lastLLCSegmentPerPartition,
realtimeToOfflineSegmentsTaskMetadata);
+ }
- for (SegmentZKMetadata segmentZKMetadata :
completedSegmentsZKMetadata) {
- String segmentName = segmentZKMetadata.getSegmentName();
- long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
- long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();
-
- // Check overlap with window
- if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs <
windowEndMs) {
- // If last completed segment is being used, make sure that segment
crosses over end of window.
- // In the absence of this check, CONSUMING segments could contain
some portion of the window. That data
- // would be skipped forever.
- if (lastLLCSegmentPerPartition.contains(segmentName) &&
segmentEndTimeMs < windowEndMs) {
- LOGGER.info("Window data overflows into CONSUMING segments for
partition of segment: {}. Skipping task "
- + "generation: {}", segmentName, taskType);
- skipGenerate = true;
- break;
- }
- segmentNames.add(segmentName);
- downloadURLs.add(segmentZKMetadata.getDownloadUrl());
+ divideSegmentsAmongSubtasks(segmentsToBeScheduled,
segmentNamesGroupList, segmentNameVsDownloadURL,
+ maxNumRecordsPerTask);
+
+ if (segmentNamesGroupList.isEmpty()) {
+ continue;
+ }
+
+ List<PinotTaskConfig> pinotTaskConfigsForTable = new ArrayList<>();
+ long newWindowStartTime =
realtimeToOfflineSegmentsTaskMetadata.getWindowStartMs();
+ long newWindowEndTime =
realtimeToOfflineSegmentsTaskMetadata.getWindowEndMs();
+
+ LOGGER.info(
+ "generating tasks for: {} with window start time: {}, window end
time: {}, table: {}", taskType,
+ windowStartMs,
+ newWindowEndTime, realtimeTableName);
+
+ for (List<String> segmentNameList : segmentNamesGroupList) {
+ List<String> downloadURLList = getDownloadURLList(segmentNameList,
segmentNameVsDownloadURL);
+ Preconditions.checkState(segmentNameList.size() ==
downloadURLList.size());
+ pinotTaskConfigsForTable.add(
+ createPinotTaskConfig(segmentNameList, downloadURLList,
realtimeTableName, taskConfigs, tableConfig,
+ newWindowStartTime,
+ newWindowEndTime, taskType));
+ }
+ try {
+ _clusterInfoAccessor
+ .setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata,
+ MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+ expectedVersion);
+ } catch (ZkBadVersionException e) {
+ LOGGER.error(
+ "Version changed while updating RTO task metadata for table: {},
skip scheduling. There are "
+ + "multiple task schedulers for the same table, need to
investigate!", realtimeTableName);
+ // skip this table for this minion run
+ continue;
+ }
+
+ pinotTaskConfigs.addAll(pinotTaskConfigsForTable);
+
+ LOGGER.info("Finished generating task configs for table: {} for task:
{}", realtimeTableName, taskType);
+ }
+ return pinotTaskConfigs;
+ }
+
+ @Override
+ public void validateTaskConfigs(TableConfig tableConfig, Map<String, String>
taskConfigs) {
+ // check table is not upsert
+ Preconditions.checkState(tableConfig.getUpsertMode() ==
UpsertConfig.Mode.NONE,
+ "RealtimeToOfflineTask doesn't support upsert table!");
+ // check no malformed period
+ TimeUtils.convertPeriodToMillis(
+
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY,
"2d"));
+ TimeUtils.convertPeriodToMillis(
+
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY,
"1d"));
+ TimeUtils.convertPeriodToMillis(
+
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY,
"1s"));
+ // check mergeType is correct
+ Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(),
MergeType.ROLLUP.name(), MergeType.DEDUP.name())
+
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY,
MergeType.CONCAT.name())
+ .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP,
DEDUP]!");
+
+ Schema schema =
_clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
+ // check no mis-configured columns
+ Set<String> columnNames = schema.getColumnNames();
+ for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
+ if (entry.getKey().endsWith(".aggregationType")) {
+ Preconditions.checkState(columnNames.contains(
+ StringUtils.removeEnd(entry.getKey(),
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX)),
+ String.format("Column \"%s\" not found in schema!",
entry.getKey()));
+ try {
+ // check that it's a valid aggregation function type
+ AggregationFunctionType aft =
AggregationFunctionType.getAggregationFunctionType(entry.getValue());
+ // check that a value aggregator is available
+ if
(!MinionConstants.RealtimeToOfflineSegmentsTask.AVAILABLE_CORE_VALUE_AGGREGATORS.contains(aft))
{
+ throw new IllegalArgumentException("ValueAggregator not enabled
for type: " + aft.toString());
}
+ } catch (IllegalArgumentException e) {
+ String err =
+ String.format("Column \"%s\" has invalid aggregate type: %s",
entry.getKey(), entry.getValue());
+ throw new IllegalStateException(err);
}
- if (skipGenerate || !segmentNames.isEmpty()) {
- break;
- }
+ }
+ }
+ }
- LOGGER.info("Found no eligible segments for task: {} with window [{} -
{}), moving to the next time bucket",
- taskType, windowStartMs, windowEndMs);
- windowStartMs = windowEndMs;
- windowEndMs += bucketMs;
+ private List<String> getDownloadURLList(List<String> segmentNameList,
Map<String, String> segmentNameVsDownloadURL) {
+ List<String> downloadURLList = new ArrayList<>();
+ for (String segmentName : segmentNameList) {
+ downloadURLList.add(segmentNameVsDownloadURL.get(segmentName));
+ }
+ return downloadURLList;
+ }
+
+ private void deleteInvalidOfflineSegments(String offlineTableName,
+ Set<String> realtimeSegmentsToBeReProcessed,
+ Set<String> existingOfflineTableSegmentNames,
+ RealtimeToOfflineSegmentsTaskMetadata
realtimeToOfflineSegmentsTaskMetadata) {
+
+ Map<String, String> segmentNameToExpectedSubtaskResultID =
+
realtimeToOfflineSegmentsTaskMetadata.getSegmentNameToExpectedSubtaskResultID();
+ Map<String, ExpectedSubtaskResult> expectedSubtaskResultMap =
+ realtimeToOfflineSegmentsTaskMetadata.getExpectedSubtaskResultMap();
+
+ Set<String> segmentsToBeDeleted = new HashSet<>();
+
+ for (String realtimeSegmentName : realtimeSegmentsToBeReProcessed) {
+ String id =
segmentNameToExpectedSubtaskResultID.get(realtimeSegmentName);
+ Preconditions.checkNotNull(id);
+ ExpectedSubtaskResult expectedSubtaskResult =
+ expectedSubtaskResultMap.get(id);
+ // if already marked as failure, no need to delete again.
+ if (expectedSubtaskResult.isTaskFailure()) {
Review Comment:
This is needed and referenced in Executor.
` Preconditions.checkState(prevExpectedSubtaskResult.isTaskFailure(),
"ExpectedSubtaskResult can only be replaced if it's of a failed
task");`
Let's say There were 2 consecutive failures in RTO for a realtime segment.
Executor will try to update the expectedSubtaskResult for a segment in the map
only if existing entry is marked as failed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]