phet commented on code in PR #4037:
URL: https://github.com/apache/gobblin/pull/4037#discussion_r1730685464
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
+ boolean anyFailure = false;
+
+ for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+ if (!canRun.contains(node)) {
+ continue;
+ }
+ ExecutionStatus status = node.getValue().getExecutionStatus();
+ if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
+ anyFailure = true;
+ removeChildrenFromCanRun(node, dag, canRun);
+ completed.add(node);
+ } else if (status == ExecutionStatus.COMPLETE) {
+ completed.add(node);
+ } else if (status == ExecutionStatus.PENDING) {
+ // Remove PENDING node if its parents are not in canRun, this means
remove the pending nodes also from canRun set
+ // if its parents cannot run
+ if (!areParentsInCanRun(node, canRun)) {
+ canRun.remove(node);
+ }
+ }
+ }
+
+ // In the end, check if there are more nodes in canRun than completed
+ assert canRun.size() >= completed.size();
+
+ if (!anyFailure || failureOption ==
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+ return canRun.size() == completed.size();
+ } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+ //if all the remaining jobs are pending return true
+ canRun.removeAll(completed);
+ boolean isFinished = true;
+ for (Dag.DagNode<JobExecutionPlan> remainingNode : canRun) {
+ if (remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.RUNNING ||
+ remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.PENDING_RESUME ||
+ remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.ORCHESTRATED ||
+ remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.PENDING_RETRY) {
+ isFinished = false;
+ break;
+ }
+ }
+ return isFinished;
Review Comment:
```
List<ExecutionStatus> unfinishedStatuses =
Lists.newArrayList(RUNNING, PENIDNG_RESUME, ORCHESTRATED, PENDING_RETRY);
return canRun.stream().anyMatch(node ->
unfinishedStatuses.contains(node.getValue().getExecutionStatus());
```
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]