noob-se7en commented on code in PR #14623:
URL: https://github.com/apache/pinot/pull/14623#discussion_r1925940698
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -287,77 +585,69 @@ private void getCompletedSegmentsInfo(String
realtimeTableName, List<SegmentZKMe
* If the znode is null, computes the watermark using either the start time
config or the start time from segment
* metadata
*/
- private long getWatermarkMs(String realtimeTableName,
List<SegmentZKMetadata> completedSegmentsZKMetadata,
- long bucketMs) {
- ZNRecord realtimeToOfflineZNRecord =
-
_clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
- realtimeTableName);
- RealtimeToOfflineSegmentsTaskMetadata
realtimeToOfflineSegmentsTaskMetadata =
- realtimeToOfflineZNRecord != null ?
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(
- realtimeToOfflineZNRecord) : null;
-
- if (realtimeToOfflineSegmentsTaskMetadata == null) {
- // No ZNode exists. Cold-start.
- long watermarkMs;
-
- // Find the smallest time from all segments
- long minStartTimeMs = Long.MAX_VALUE;
- for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
- minStartTimeMs = Math.min(minStartTimeMs,
segmentZKMetadata.getStartTimeMs());
- }
- Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE);
+ private RealtimeToOfflineSegmentsTaskMetadata getRTOTaskMetadata(String
realtimeTableName,
+ List<SegmentZKMetadata> completedSegmentsZKMetadata,
+ long bucketMs, ZNRecord realtimeToOfflineZNRecord) {
+
+ if (realtimeToOfflineZNRecord != null) {
+ return RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(
+ realtimeToOfflineZNRecord);
+ }
- // Round off according to the bucket. This ensures we align the offline
segments to proper time boundaries
- // For example, if start time millis is 20200813T12:34:59, we want to
create the first segment for window
- // [20200813, 20200814)
- watermarkMs = (minStartTimeMs / bucketMs) * bucketMs;
+ // No ZNode exists. Cold-start.
+ long watermarkMs;
- // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark
calculated above
- realtimeToOfflineSegmentsTaskMetadata = new
RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs);
-
_clusterInfoAccessor.setMinionTaskMetadata(realtimeToOfflineSegmentsTaskMetadata,
- MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, -1);
+ // Find the smallest time from all segments
+ long minStartTimeMs = Long.MAX_VALUE;
+ for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
+ minStartTimeMs = Math.min(minStartTimeMs,
segmentZKMetadata.getStartTimeMs());
}
- return realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs();
- }
+ Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE);
- @Override
- public void validateTaskConfigs(TableConfig tableConfig, Map<String, String>
taskConfigs) {
- // check table is not upsert
- Preconditions.checkState(tableConfig.getUpsertMode() ==
UpsertConfig.Mode.NONE,
- "RealtimeToOfflineTask doesn't support upsert table!");
- // check no malformed period
- TimeUtils.convertPeriodToMillis(
-
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY,
"2d"));
- TimeUtils.convertPeriodToMillis(
-
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY,
"1d"));
- TimeUtils.convertPeriodToMillis(
-
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.ROUND_BUCKET_TIME_PERIOD_KEY,
"1s"));
- // check mergeType is correct
- Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(),
MergeType.ROLLUP.name(), MergeType.DEDUP.name())
-
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY,
MergeType.CONCAT.name())
- .toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP,
DEDUP]!");
+ // Round off according to the bucket. This ensures we align the offline
segments to proper time boundaries
+ // For example, if start time millis is 20200813T12:34:59, we want to
create the first segment for window
+ // [20200813, 20200814)
+ watermarkMs = (minStartTimeMs / bucketMs) * bucketMs;
- Schema schema =
_clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
- // check no mis-configured columns
- Set<String> columnNames = schema.getColumnNames();
+ return new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName,
watermarkMs);
+ }
+
+ private PinotTaskConfig createPinotTaskConfig(List<String> segmentNameList,
List<String> downloadURLList,
+ String realtimeTableName, Map<String, String> taskConfigs, TableConfig
tableConfig, long windowStartMs,
+ long windowEndMs, String taskType) {
+
+ Map<String, String> configs =
MinionTaskUtils.getPushTaskConfig(realtimeTableName, taskConfigs,
+ _clusterInfoAccessor);
+ configs.putAll(getBaseTaskConfigs(tableConfig, segmentNameList));
+ configs.put(MinionConstants.DOWNLOAD_URL_KEY,
StringUtils.join(downloadURLList, MinionConstants.URL_SEPARATOR));
Review Comment:
Yes order matches.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]