kyoty commented on a change in pull request #5912:
URL: https://github.com/apache/dolphinscheduler/pull/5912#discussion_r689212950



##########
File path: 
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
##########
@@ -553,14 +555,21 @@ private int createCommand(CommandType commandType, int 
processDefineId,
                         }
                     }
                     if (!CollectionUtils.isEmpty(listDate)) {
-                        // loop by schedule date
-                        for (Date date : listDate) {
-                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, 
DateUtils.dateToString(date));
-                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, 
DateUtils.dateToString(date));
+
+                        int effectThreadsCount = expectedParallelismNumber == 
null ? 1 : Math.min(listDate.size(), expectedParallelismNumber);
+                        logger.info("In parallel mode, current 
expectedParallelismNumber:{}", expectedParallelismNumber);
+
+                        int average = listDate.size() / effectThreadsCount;
+                        int slice = listDate.size() % effectThreadsCount == 0 
? average : average + 1;
+
+                        Lists.partition(listDate, 
slice).stream().forEach(partition -> {
+                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, 
DateUtils.dateToString(partition.get(0)));
+                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, 
DateUtils.dateToString(partition.get(partition.size() - 1)));

Review comment:
       @zhuangchong Sorry for the delay.
   
   This part of the code is a little Confusing.
   
   First of all, we must know that: **`CronUtils#getSelfFireDateList` will only 
execute with the time points that satisfy cronExpression in the open time 
interval (startTime, endTime) .**
   
   Then there are currently two complements, all needs to be taken into 
consideration:
   
   1. Serial complement, `CronUtils#getSelfFireDateList` will only be executed 
once in `MasterExecThread#executeComplementProcess`.
   
   2. Parallel complement, `CronUtils#getSelfFireDateList` will be executed 
twice: `ExecutorServiceImpl#createCommand` and 
`MasterExecThread#executeComplementProcess`, **executed twice will cause the 
`startTime` and `endTime` of each prepared-execute timeInterval in 
custom-parallel mode to be discarded**.
   
   **I dare not modify `CronUtils.getSelfFireDateList`, because modifying it 
will also affect the serial complement case.**
   
    **In order to be compatible with all scenarios at the same time, I modified 
the time range split in parallel mode, such as the prepared-execute 
timeInterval is [A,B], but we would split it into  [A-1, B+1].  In this way, 
after executing `CronUtils.getSelfFireDateList` twice, we would finally got the 
correct interval.**
   
   Now I've implemented it as I described above, and the manual test works well.
   
   Do you have any idea?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to