[
https://issues.apache.org/jira/browse/GOBBLIN-2142?focusedWorklogId=931805&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-931805
]
ASF GitHub Bot logged work on GOBBLIN-2142:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 26/Aug/24 20:15
Start Date: 26/Aug/24 20:15
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4037:
URL: https://github.com/apache/gobblin/pull/4037#discussion_r1731756895
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +293,81 @@ public static void
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ * If failure option is {@link
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+ * no new jobs should be orchestrated, so even if some job can run, dag
should be considered finished.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ /*
+ The algo for this method is that it adds all the dag nodes into a set
`canRun` that signifies all the nodes that can
+ run in this dag. This also includes all the jobs that are completed. It
scans all the nodes and if the node is
+ completed it adds it to the `completed` set; if the node is
failed/cancelled it removes all its dependant nodes from
+ `canRun` set. In the end if there are more nodes that "canRun" than
"completed", dag is not finished.
+ For FINISH_RUNNING failure option, there is an additional condition that
all the remaining `canRun` jobs should already
+ be running/orchestrated/pending_retry/pending_resume. Basically they
should already be out of PENDING state, in order
+ for dag to be considered "NOT FINISHED".
+ */
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
+ boolean anyFailure = false;
+
+ for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+ if (!canRun.contains(node)) {
+ continue;
+ }
+ ExecutionStatus status = node.getValue().getExecutionStatus();
+ if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
+ anyFailure = true;
+ removeDescendantsFromCanRun(node, dag, canRun);
+ completed.add(node);
+ } else if (status == ExecutionStatus.COMPLETE) {
+ completed.add(node);
+ } else if (status == ExecutionStatus.PENDING) {
+ // Remove PENDING node if its parents are not in canRun, this means
remove the pending nodes also from canRun set
+ // if its parents cannot run
+ if (!areAllParentsInCanRun(node, canRun)) {
+ canRun.remove(node);
+ }
+ }
Review Comment:
this could given unexpected results if a new `ExecutionStatus` were ever
added. even if other statuses don't call for processing, at least confirm this
status is among those known, and fail-fast on any other
(like you do w/ `failureOption` below)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -309,52 +311,43 @@ public static boolean isDagFinished(Dag<JobExecutionPlan>
dag) {
ExecutionStatus status = node.getValue().getExecutionStatus();
if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
anyFailure = true;
- removeChildrenFromCanRun(node, dag, canRun);
+ removeDescendantsFromCanRun(node, dag, canRun);
completed.add(node);
} else if (status == ExecutionStatus.COMPLETE) {
completed.add(node);
} else if (status == ExecutionStatus.PENDING) {
// Remove PENDING node if its parents are not in canRun, this means
remove the pending nodes also from canRun set
// if its parents cannot run
- if (!areParentsInCanRun(node, canRun)) {
+ if (!areAllParentsInCanRun(node, canRun)) {
canRun.remove(node);
}
}
}
- // In the end, check if there are more nodes in canRun than completed
assert canRun.size() >= completed.size();
if (!anyFailure || failureOption ==
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+ // In the end, check if there are more nodes in canRun than completed
return canRun.size() == completed.size();
} else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
- //if all the remaining jobs are pending return true
+ // if all the remaining jobs are pending return true
Review Comment:
(I reached out over DM, to find out why I'm reading something different)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +293,81 @@ public static void
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ * If failure option is {@link
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+ * no new jobs should be orchestrated, so even if some job can run, dag
should be considered finished.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ /*
+ The algo for this method is that it adds all the dag nodes into a set
`canRun` that signifies all the nodes that can
+ run in this dag. This also includes all the jobs that are completed. It
scans all the nodes and if the node is
+ completed it adds it to the `completed` set; if the node is
failed/cancelled it removes all its dependant nodes from
+ `canRun` set. In the end if there are more nodes that "canRun" than
"completed", dag is not finished.
+ For FINISH_RUNNING failure option, there is an additional condition that
all the remaining `canRun` jobs should already
+ be running/orchestrated/pending_retry/pending_resume. Basically they
should already be out of PENDING state, in order
+ for dag to be considered "NOT FINISHED".
+ */
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
Review Comment:
suggest to move decl to line 339
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +293,81 @@ public static void
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ * If failure option is {@link
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+ * no new jobs should be orchestrated, so even if some job can run, dag
should be considered finished.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ /*
+ The algo for this method is that it adds all the dag nodes into a set
`canRun` that signifies all the nodes that can
+ run in this dag. This also includes all the jobs that are completed. It
scans all the nodes and if the node is
+ completed it adds it to the `completed` set; if the node is
failed/cancelled it removes all its dependant nodes from
+ `canRun` set. In the end if there are more nodes that "canRun" than
"completed", dag is not finished.
+ For FINISH_RUNNING failure option, there is an additional condition that
all the remaining `canRun` jobs should already
+ be running/orchestrated/pending_retry/pending_resume. Basically they
should already be out of PENDING state, in order
+ for dag to be considered "NOT FINISHED".
+ */
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
+ boolean anyFailure = false;
+
+ for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+ if (!canRun.contains(node)) {
+ continue;
+ }
+ ExecutionStatus status = node.getValue().getExecutionStatus();
+ if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
+ anyFailure = true;
+ removeDescendantsFromCanRun(node, dag, canRun);
+ completed.add(node);
+ } else if (status == ExecutionStatus.COMPLETE) {
+ completed.add(node);
+ } else if (status == ExecutionStatus.PENDING) {
+ // Remove PENDING node if its parents are not in canRun, this means
remove the pending nodes also from canRun set
+ // if its parents cannot run
+ if (!areAllParentsInCanRun(node, canRun)) {
+ canRun.remove(node);
+ }
+ }
+ }
+
+ assert canRun.size() >= completed.size();
+
+ if (!anyFailure || failureOption ==
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+ // In the end, check if there are more nodes in canRun than completed
+ return canRun.size() == completed.size();
+ } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+ // if all the remaining jobs are pending return true
+ canRun.removeAll(completed);
+ List<ExecutionStatus> unfinishedStatuses = Lists.newArrayList(RUNNING,
PENDING_RESUME, ORCHESTRATED, PENDING_RETRY);
+ return canRun.stream().noneMatch(node ->
unfinishedStatuses.contains(node.getValue().getExecutionStatus()));
+ } else {
+ throw new RuntimeException("Unexpected failure option " + failureOption);
+ }
+ }
+
+ private static void
removeDescendantsFromCanRun(Dag.DagNode<JobExecutionPlan> node,
Dag<JobExecutionPlan> dag,
+ Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+ for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
+ canRun.remove(child);
+ removeDescendantsFromCanRun(child, dag, canRun); // Recursively remove
all descendants
+ }
+ }
+
+ private static boolean areAllParentsInCanRun(Dag.DagNode<JobExecutionPlan>
node,
+ Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+ if (node.getParentNodes() == null) {
+ return true;
+ }
+ for (Dag.DagNode<JobExecutionPlan> parent : node.getParentNodes()) {
+ if (!canRun.contains(parent)) {
+ return false; // If any parent is not in canRun, return false
+ }
+ }
+ return true; // All parents are in canRun
Review Comment:
```
return node.getParentNodes() == null
|| node.getParentNodes().stream().allMatch(parent ->
canRun.contains(parent));
```
Issue Time Tracking
-------------------
Worklog Id: (was: 931805)
Time Spent: 3h (was: 2h 50m)
> find if the dag is running or not correctly
> -------------------------------------------
>
> Key: GOBBLIN-2142
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2142
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 3h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)