kyoty commented on a change in pull request #5912:
URL: https://github.com/apache/dolphinscheduler/pull/5912#discussion_r689212950
##########
File path:
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
##########
@@ -553,14 +555,21 @@ private int createCommand(CommandType commandType, int
processDefineId,
}
}
if (!CollectionUtils.isEmpty(listDate)) {
- // loop by schedule date
- for (Date date : listDate) {
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(date));
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(date));
+
+ int effectThreadsCount = expectedParallelismNumber ==
null ? 1 : Math.min(listDate.size(), expectedParallelismNumber);
+ logger.info("In parallel mode, current
expectedParallelismNumber:{}", expectedParallelismNumber);
+
+ int average = listDate.size() / effectThreadsCount;
+ int slice = listDate.size() % effectThreadsCount == 0
? average : average + 1;
+
+ Lists.partition(listDate,
slice).stream().forEach(partition -> {
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE,
DateUtils.dateToString(partition.get(0)));
+ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(partition.get(partition.size() - 1)));
Review comment:
@zhuangchong Sorry for the delay.
This part of the code is very complicated.
First of all, we must know that: **`CronUtils#getSelfFireDateList` will only
execute with the time points that satisfy cronExpression in the open time
interval (startTime, endTime) .**
Then there are currently two complements, all needs to be taken into
consideration:
1. Serial complement, `CronUtils#getSelfFireDateList` will only be executed
once in `MasterExecThread#executeComplementProcess`.
2. Parallel complement, `CronUtils#getSelfFireDateList` will be executed
twice: `ExecutorServiceImpl#createCommand` and
`MasterExecThread#executeComplementProcess`, **executed twice will cause the
`startTime` and `endTime` of each prepared-execute timeInterval in
custom-parallel mode to be discarded**.
**I dare not modify `CronUtils.getSelfFireDateList`, because modifying it
will also affect the serial complement case.**
**In order to be compatible with all scenarios at the same time, I modified
the time range split in parallel mode, such as the prepared-execute
timeInterval is [A,B], but we would split it into [A-1, B+1]. In this way,
after executing `CronUtils.getSelfFireDateList` twice, we would finally got the
correct interval.**
Now I've implemented it as I described above, and the manual test works well.
Do you have any ieda?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]