fuchanghai commented on code in PR #14664:
URL:
https://github.com/apache/dolphinscheduler/pull/14664#discussion_r1280039156
##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java:
##########
@@ -384,13 +384,40 @@ public static List<Long> parseConditionTask(Long nodeCode,
// the skipNodeList maybe null if no next task
skipNodeList = Optional.ofNullable(skipNodeList).orElse(new
ArrayList<>());
for (Long failedNode : skipNodeList) {
- setTaskNodeSkip(failedNode, dag, completeTaskList,
skipTaskNodeList);
+ // return nodes set to skip in setTaskNodeSkip method
+ Map<Long, TaskNode> skipNodesAfterFailedNode =
+ setTaskNodeSkip(failedNode, dag, completeTaskList,
skipTaskNodeList);
+ parseConditionNodeAfterSkippedNode(skipTaskNodeList, dag,
conditionTaskList, skipNodesAfterFailedNode);
}
// the conditionTaskList maybe null if no next task
conditionTaskList = Optional.ofNullable(conditionTaskList).orElse(new
ArrayList<>());
return conditionTaskList;
}
+ /**
+ * In some special cases, such as the condition task is set to "OR", the
condition task after skipped node needs to be submitted to execution
+ *
+ * @param skipTaskNodeList
+ * @param dag
+ * @param conditionTaskList
+ * @param skipTaskAfterNode
+ */
+ private static void parseConditionNodeAfterSkippedNode(Map<Long, TaskNode>
skipTaskNodeList,
+ DAG<Long, TaskNode,
TaskNodeRelation> dag,
+ List<Long>
conditionTaskList,
+ Map<Long, TaskNode>
skipTaskAfterNode) {
+
+ for (Long taskCode : skipTaskAfterNode.keySet()) {
+ Set<Long> subsequentNodes = dag.getSubsequentNodes(taskCode);
Review Comment:
I know what's going on, ignore the above question
##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java:
##########
@@ -439,22 +466,26 @@ private static List<Long> skipTaskNode4Switch(TaskNode
taskNode,
/**
* set task node and the post nodes skip flag
+ * Returns some nodes that have been set to skip in this method
*/
- private static void setTaskNodeSkip(Long skipNodeCode,
- DAG<Long, TaskNode, TaskNodeRelation>
dag,
- Map<Long, TaskInstance>
completeTaskList,
- Map<Long, TaskNode> skipTaskNodeList) {
+ private static Map<Long, TaskNode> setTaskNodeSkip(Long skipNodeCode,
+ DAG<Long, TaskNode,
TaskNodeRelation> dag,
+ Map<Long, TaskInstance>
completeTaskList,
+ Map<Long, TaskNode>
skipTaskNodeList) {
+ HashMap<Long, TaskNode> skipNodesInThisMethod = new HashMap<>();
if (!dag.containsNode(skipNodeCode)) {
- return;
+ return skipNodesInThisMethod;
}
skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode));
+ skipNodesInThisMethod.put(skipNodeCode, dag.getNode(skipNodeCode));
Collection<Long> postNodeList = dag.getSubsequentNodes(skipNodeCode);
for (Long post : postNodeList) {
TaskNode postNode = dag.getNode(post);
if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) {
- setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList);
Review Comment:
If the recursion here is removed, the logic will only work on the first node
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]