[ 
https://issues.apache.org/jira/browse/GOBBLIN-2142?focusedWorklogId=931805&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-931805
 ]

ASF GitHub Bot logged work on GOBBLIN-2142:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Aug/24 20:15
            Start Date: 26/Aug/24 20:15
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4037:
URL: https://github.com/apache/gobblin/pull/4037#discussion_r1731756895


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +293,81 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   * If failure option is {@link 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+   * no new jobs should be orchestrated, so even if some job can run, dag 
should be considered finished.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    /*
+    The algo for this method is that it adds all the dag nodes into a set 
`canRun` that signifies all the nodes that can
+    run in this dag. This also includes all the jobs that are completed. It 
scans all the nodes and if the node is
+    completed it adds it to the `completed` set; if the node is 
failed/cancelled it removes all its dependant nodes from
+    `canRun` set. In the end if there are more nodes that "canRun" than 
"completed", dag is not finished.
+    For FINISH_RUNNING failure option, there is an additional condition that 
all the remaining `canRun` jobs should already
+    be running/orchestrated/pending_retry/pending_resume. Basically they 
should already be out of PENDING state, in order
+    for dag to be considered "NOT FINISHED".
+     */
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeDescendantsFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);
+      } else if (status == ExecutionStatus.PENDING) {
+        // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
+        // if its parents cannot run
+        if (!areAllParentsInCanRun(node, canRun)) {
+          canRun.remove(node);
+        }
+      }

Review Comment:
   this could given unexpected results if a new `ExecutionStatus` were ever 
added.  even if other statuses don't call for processing, at least confirm this 
status is among those known, and fail-fast on any other
   
   (like you do w/ `failureOption` below)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -309,52 +311,43 @@ public static boolean isDagFinished(Dag<JobExecutionPlan> 
dag) {
       ExecutionStatus status = node.getValue().getExecutionStatus();
       if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
         anyFailure = true;
-        removeChildrenFromCanRun(node, dag, canRun);
+        removeDescendantsFromCanRun(node, dag, canRun);
         completed.add(node);
       } else if (status == ExecutionStatus.COMPLETE) {
         completed.add(node);
       } else if (status == ExecutionStatus.PENDING) {
         // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
         // if its parents cannot run
-        if (!areParentsInCanRun(node, canRun)) {
+        if (!areAllParentsInCanRun(node, canRun)) {
           canRun.remove(node);
         }
       }
     }
 
-    // In the end, check if there are more nodes in canRun than completed
     assert canRun.size() >= completed.size();
 
     if (!anyFailure || failureOption == 
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+      // In the end, check if there are more nodes in canRun than completed
       return canRun.size() == completed.size();
     } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
-      //if all the remaining jobs are pending return true
+      // if all the remaining jobs are pending return true

Review Comment:
   (I reached out over DM, to find out why I'm reading something different)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +293,81 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   * If failure option is {@link 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+   * no new jobs should be orchestrated, so even if some job can run, dag 
should be considered finished.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    /*
+    The algo for this method is that it adds all the dag nodes into a set 
`canRun` that signifies all the nodes that can
+    run in this dag. This also includes all the jobs that are completed. It 
scans all the nodes and if the node is
+    completed it adds it to the `completed` set; if the node is 
failed/cancelled it removes all its dependant nodes from
+    `canRun` set. In the end if there are more nodes that "canRun" than 
"completed", dag is not finished.
+    For FINISH_RUNNING failure option, there is an additional condition that 
all the remaining `canRun` jobs should already
+    be running/orchestrated/pending_retry/pending_resume. Basically they 
should already be out of PENDING state, in order
+    for dag to be considered "NOT FINISHED".
+     */
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);

Review Comment:
   suggest to move decl to line 339



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +293,81 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   * If failure option is {@link 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+   * no new jobs should be orchestrated, so even if some job can run, dag 
should be considered finished.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    /*
+    The algo for this method is that it adds all the dag nodes into a set 
`canRun` that signifies all the nodes that can
+    run in this dag. This also includes all the jobs that are completed. It 
scans all the nodes and if the node is
+    completed it adds it to the `completed` set; if the node is 
failed/cancelled it removes all its dependant nodes from
+    `canRun` set. In the end if there are more nodes that "canRun" than 
"completed", dag is not finished.
+    For FINISH_RUNNING failure option, there is an additional condition that 
all the remaining `canRun` jobs should already
+    be running/orchestrated/pending_retry/pending_resume. Basically they 
should already be out of PENDING state, in order
+    for dag to be considered "NOT FINISHED".
+     */
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeDescendantsFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);
+      } else if (status == ExecutionStatus.PENDING) {
+        // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
+        // if its parents cannot run
+        if (!areAllParentsInCanRun(node, canRun)) {
+          canRun.remove(node);
+        }
+      }
+    }
+
+    assert canRun.size() >= completed.size();
+
+    if (!anyFailure || failureOption == 
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+      // In the end, check if there are more nodes in canRun than completed
+      return canRun.size() == completed.size();
+    } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+      // if all the remaining jobs are pending return true
+      canRun.removeAll(completed);
+      List<ExecutionStatus> unfinishedStatuses = Lists.newArrayList(RUNNING, 
PENDING_RESUME, ORCHESTRATED, PENDING_RETRY);
+      return canRun.stream().noneMatch(node -> 
unfinishedStatuses.contains(node.getValue().getExecutionStatus()));
+    } else {
+      throw new RuntimeException("Unexpected failure option " + failureOption);
+    }
+  }
+
+  private static void 
removeDescendantsFromCanRun(Dag.DagNode<JobExecutionPlan> node, 
Dag<JobExecutionPlan> dag,
+      Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+    for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
+      canRun.remove(child);
+      removeDescendantsFromCanRun(child, dag, canRun); // Recursively remove 
all descendants
+    }
+  }
+
+  private static boolean areAllParentsInCanRun(Dag.DagNode<JobExecutionPlan> 
node,
+      Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+    if (node.getParentNodes() == null) {
+      return true;
+    }
+    for (Dag.DagNode<JobExecutionPlan> parent : node.getParentNodes()) {
+      if (!canRun.contains(parent)) {
+        return false; // If any parent is not in canRun, return false
+      }
+    }
+    return true; // All parents are in canRun

Review Comment:
   ```
   return node.getParentNodes() == null
       || node.getParentNodes().stream().allMatch(parent -> 
canRun.contains(parent));
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 931805)
    Time Spent: 3h  (was: 2h 50m)

> find if the dag is running or not correctly
> -------------------------------------------
>
>                 Key: GOBBLIN-2142
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2142
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to