phet commented on code in PR #4037:
URL: https://github.com/apache/gobblin/pull/4037#discussion_r1731756895
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +293,81 @@ public static void
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ * If failure option is {@link
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+ * no new jobs should be orchestrated, so even if some job can run, dag
should be considered finished.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ /*
+ The algo for this method is that it adds all the dag nodes into a set
`canRun` that signifies all the nodes that can
+ run in this dag. This also includes all the jobs that are completed. It
scans all the nodes and if the node is
+ completed it adds it to the `completed` set; if the node is
failed/cancelled it removes all its dependant nodes from
+ `canRun` set. In the end if there are more nodes that "canRun" than
"completed", dag is not finished.
+ For FINISH_RUNNING failure option, there is an additional condition that
all the remaining `canRun` jobs should already
+ be running/orchestrated/pending_retry/pending_resume. Basically they
should already be out of PENDING state, in order
+ for dag to be considered "NOT FINISHED".
+ */
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
+ boolean anyFailure = false;
+
+ for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+ if (!canRun.contains(node)) {
+ continue;
+ }
+ ExecutionStatus status = node.getValue().getExecutionStatus();
+ if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
+ anyFailure = true;
+ removeDescendantsFromCanRun(node, dag, canRun);
+ completed.add(node);
+ } else if (status == ExecutionStatus.COMPLETE) {
+ completed.add(node);
+ } else if (status == ExecutionStatus.PENDING) {
+ // Remove PENDING node if its parents are not in canRun, this means
remove the pending nodes also from canRun set
+ // if its parents cannot run
+ if (!areAllParentsInCanRun(node, canRun)) {
+ canRun.remove(node);
+ }
+ }
Review Comment:
this could given unexpected results if a new `ExecutionStatus` were ever
added. even if other statuses don't call for processing, at least confirm this
status is among those known, and fail-fast on any other
(like you do w/ `failureOption` below)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -309,52 +311,43 @@ public static boolean isDagFinished(Dag<JobExecutionPlan>
dag) {
ExecutionStatus status = node.getValue().getExecutionStatus();
if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
anyFailure = true;
- removeChildrenFromCanRun(node, dag, canRun);
+ removeDescendantsFromCanRun(node, dag, canRun);
completed.add(node);
} else if (status == ExecutionStatus.COMPLETE) {
completed.add(node);
} else if (status == ExecutionStatus.PENDING) {
// Remove PENDING node if its parents are not in canRun, this means
remove the pending nodes also from canRun set
// if its parents cannot run
- if (!areParentsInCanRun(node, canRun)) {
+ if (!areAllParentsInCanRun(node, canRun)) {
canRun.remove(node);
}
}
}
- // In the end, check if there are more nodes in canRun than completed
assert canRun.size() >= completed.size();
if (!anyFailure || failureOption ==
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+ // In the end, check if there are more nodes in canRun than completed
return canRun.size() == completed.size();
} else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
- //if all the remaining jobs are pending return true
+ // if all the remaining jobs are pending return true
Review Comment:
(I reached out over DM, to find out why I'm reading something different)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +293,81 @@ public static void
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ * If failure option is {@link
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+ * no new jobs should be orchestrated, so even if some job can run, dag
should be considered finished.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ /*
+ The algo for this method is that it adds all the dag nodes into a set
`canRun` that signifies all the nodes that can
+ run in this dag. This also includes all the jobs that are completed. It
scans all the nodes and if the node is
+ completed it adds it to the `completed` set; if the node is
failed/cancelled it removes all its dependant nodes from
+ `canRun` set. In the end if there are more nodes that "canRun" than
"completed", dag is not finished.
+ For FINISH_RUNNING failure option, there is an additional condition that
all the remaining `canRun` jobs should already
+ be running/orchestrated/pending_retry/pending_resume. Basically they
should already be out of PENDING state, in order
+ for dag to be considered "NOT FINISHED".
+ */
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
Review Comment:
suggest to move decl to line 339
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +293,81 @@ public static void
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ * If failure option is {@link
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+ * no new jobs should be orchestrated, so even if some job can run, dag
should be considered finished.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ /*
+ The algo for this method is that it adds all the dag nodes into a set
`canRun` that signifies all the nodes that can
+ run in this dag. This also includes all the jobs that are completed. It
scans all the nodes and if the node is
+ completed it adds it to the `completed` set; if the node is
failed/cancelled it removes all its dependant nodes from
+ `canRun` set. In the end if there are more nodes that "canRun" than
"completed", dag is not finished.
+ For FINISH_RUNNING failure option, there is an additional condition that
all the remaining `canRun` jobs should already
+ be running/orchestrated/pending_retry/pending_resume. Basically they
should already be out of PENDING state, in order
+ for dag to be considered "NOT FINISHED".
+ */
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
+ boolean anyFailure = false;
+
+ for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+ if (!canRun.contains(node)) {
+ continue;
+ }
+ ExecutionStatus status = node.getValue().getExecutionStatus();
+ if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
+ anyFailure = true;
+ removeDescendantsFromCanRun(node, dag, canRun);
+ completed.add(node);
+ } else if (status == ExecutionStatus.COMPLETE) {
+ completed.add(node);
+ } else if (status == ExecutionStatus.PENDING) {
+ // Remove PENDING node if its parents are not in canRun, this means
remove the pending nodes also from canRun set
+ // if its parents cannot run
+ if (!areAllParentsInCanRun(node, canRun)) {
+ canRun.remove(node);
+ }
+ }
+ }
+
+ assert canRun.size() >= completed.size();
+
+ if (!anyFailure || failureOption ==
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+ // In the end, check if there are more nodes in canRun than completed
+ return canRun.size() == completed.size();
+ } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+ // if all the remaining jobs are pending return true
+ canRun.removeAll(completed);
+ List<ExecutionStatus> unfinishedStatuses = Lists.newArrayList(RUNNING,
PENDING_RESUME, ORCHESTRATED, PENDING_RETRY);
+ return canRun.stream().noneMatch(node ->
unfinishedStatuses.contains(node.getValue().getExecutionStatus()));
+ } else {
+ throw new RuntimeException("Unexpected failure option " + failureOption);
+ }
+ }
+
+ private static void
removeDescendantsFromCanRun(Dag.DagNode<JobExecutionPlan> node,
Dag<JobExecutionPlan> dag,
+ Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+ for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
+ canRun.remove(child);
+ removeDescendantsFromCanRun(child, dag, canRun); // Recursively remove
all descendants
+ }
+ }
+
+ private static boolean areAllParentsInCanRun(Dag.DagNode<JobExecutionPlan>
node,
+ Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+ if (node.getParentNodes() == null) {
+ return true;
+ }
+ for (Dag.DagNode<JobExecutionPlan> parent : node.getParentNodes()) {
+ if (!canRun.contains(parent)) {
+ return false; // If any parent is not in canRun, return false
+ }
+ }
+ return true; // All parents are in canRun
Review Comment:
```
return node.getParentNodes() == null
|| node.getParentNodes().stream().allMatch(parent ->
canRun.contains(parent));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]