[ 
https://issues.apache.org/jira/browse/GOBBLIN-2142?focusedWorklogId=931648&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-931648
 ]

ASF GitHub Bot logged work on GOBBLIN-2142:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Aug/24 05:44
            Start Date: 26/Aug/24 05:44
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4037:
URL: https://github.com/apache/gobblin/pull/4037#discussion_r1730679368


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeChildrenFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);

Review Comment:
   I don't totally understand the logic.  whey the node is `COMPLETE`, why keep 
it in `canRun`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeChildrenFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);
+      } else if (status == ExecutionStatus.PENDING) {
+        // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
+        // if its parents cannot run
+        if (!areParentsInCanRun(node, canRun)) {
+          canRun.remove(node);
+        }
+      }
+    }
+
+    // In the end, check if there are more nodes in canRun than completed
+    assert canRun.size() >= completed.size();
+
+    if (!anyFailure || failureOption == 
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+      return canRun.size() == completed.size();
+    } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+      //if all the remaining jobs are pending return true
+      canRun.removeAll(completed);
+      boolean isFinished = true;
+      for (Dag.DagNode<JobExecutionPlan> remainingNode : canRun) {
+        if (remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.RUNNING ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.PENDING_RESUME ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.ORCHESTRATED ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.PENDING_RETRY) {
+          isFinished = false;
+          break;
+        }
+      }
+      return isFinished;
+    } else {
+      throw new RuntimeException("Unexpected failure option " + failureOption);
+    }
+  }
+
+  private static void removeChildrenFromCanRun(Dag.DagNode<JobExecutionPlan> 
node, Dag<JobExecutionPlan> dag,
+      Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+    for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
+      canRun.remove(child);
+      removeChildrenFromCanRun(child, dag, canRun); // Recursively remove all 
descendants
+    }
+  }
+
+  private static boolean areParentsInCanRun(Dag.DagNode<JobExecutionPlan> node,

Review Comment:
   `areAllParentsInCanRun` / `isEveryParentInCanRun`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -219,6 +219,7 @@ public static Set<DagNode<JobExecutionPlan>> 
getNext(Dag<JobExecutionPlan> dag)
         switch (failureOption) {
           case FINISH_RUNNING:
             return new HashSet<>();
+          // todo - FINISH_ALL_POSSIBLE should probably `continue` not `break`

Review Comment:
   seems a simple enough change... modify now?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java:
##########
@@ -38,4 +59,299 @@ public void testGetJobSpecFromDag() throws Exception {
       Assert.assertEquals(jobSpec.getConfigAsProperties().get(key), 
jobSpec.getConfig().getString(key));
     }
   }
+
+  @Test
+  public void testIsDagFinished() throws URISyntaxException {
+    long flowExecutionId = 12345L;
+    String flowGroup = "fg";
+    String flowName = "fn";
+
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        1, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+    );
+
+    setJobStatuses(dag, Collections.singletonList(COMPLETE));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(FAILED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(CANCELLED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(RUNNING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),

Review Comment:
   rather than one single massive test method, could this be the start of a 
second test (e.g. single hop vs. multi)?
   
   (this same advice goes for each of the several other re-assignments to `dag` 
that follow)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeChildrenFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);
+      } else if (status == ExecutionStatus.PENDING) {
+        // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
+        // if its parents cannot run
+        if (!areParentsInCanRun(node, canRun)) {
+          canRun.remove(node);
+        }
+      }
+    }
+
+    // In the end, check if there are more nodes in canRun than completed
+    assert canRun.size() >= completed.size();
+
+    if (!anyFailure || failureOption == 
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+      return canRun.size() == completed.size();
+    } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+      //if all the remaining jobs are pending return true
+      canRun.removeAll(completed);
+      boolean isFinished = true;
+      for (Dag.DagNode<JobExecutionPlan> remainingNode : canRun) {
+        if (remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.RUNNING ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.PENDING_RESUME ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.ORCHESTRATED ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.PENDING_RETRY) {
+          isFinished = false;
+          break;
+        }
+      }
+      return isFinished;

Review Comment:
   ```
   List<ExecutionStatus> unfinishedStatuses = Lists.newArrayList(RUNNING, 
PENIDNG_RESUME, ORCHESTRATED, PENDING_RETRY);
   return canRun.stream().anyMatch(node -> 
unfinishedStatuses.contains(node.getValue().getExecutionStatus());
   ```
   ?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java:
##########
@@ -38,4 +59,299 @@ public void testGetJobSpecFromDag() throws Exception {
       Assert.assertEquals(jobSpec.getConfigAsProperties().get(key), 
jobSpec.getConfig().getString(key));
     }
   }
+
+  @Test
+  public void testIsDagFinished() throws URISyntaxException {

Review Comment:
   given the method lives in `DagProcUtils`, why put it into 
`DagManagerUtilsTest`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeChildrenFromCanRun(node, dag, canRun);

Review Comment:
   is there a specific ordering guarantee for `DagNode<T>::getNodes`?  how do 
we know this parent would be executed after the children, such that it's 
children don't get added to `canRun`, even though their parent's failure should 
mean the child isn't ready to run?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeChildrenFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);
+      } else if (status == ExecutionStatus.PENDING) {
+        // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
+        // if its parents cannot run
+        if (!areParentsInCanRun(node, canRun)) {
+          canRun.remove(node);
+        }
+      }
+    }
+
+    // In the end, check if there are more nodes in canRun than completed
+    assert canRun.size() >= completed.size();
+
+    if (!anyFailure || failureOption == 
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+      return canRun.size() == completed.size();
+    } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+      //if all the remaining jobs are pending return true
+      canRun.removeAll(completed);
+      boolean isFinished = true;
+      for (Dag.DagNode<JobExecutionPlan> remainingNode : canRun) {
+        if (remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.RUNNING ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.PENDING_RESUME ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.ORCHESTRATED ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.PENDING_RETRY) {
+          isFinished = false;
+          break;
+        }
+      }
+      return isFinished;
+    } else {
+      throw new RuntimeException("Unexpected failure option " + failureOption);
+    }
+  }
+
+  private static void removeChildrenFromCanRun(Dag.DagNode<JobExecutionPlan> 
node, Dag<JobExecutionPlan> dag,

Review Comment:
   seems this should be named `removeDescendantsFromCanRun`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java:
##########
@@ -38,4 +59,299 @@ public void testGetJobSpecFromDag() throws Exception {
       Assert.assertEquals(jobSpec.getConfigAsProperties().get(key), 
jobSpec.getConfig().getString(key));
     }
   }
+
+  @Test
+  public void testIsDagFinished() throws URISyntaxException {
+    long flowExecutionId = 12345L;
+    String flowGroup = "fg";
+    String flowName = "fn";
+
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        1, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+    );
+
+    setJobStatuses(dag, Collections.singletonList(COMPLETE));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(FAILED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(CANCELLED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(RUNNING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));

Review Comment:
   didn't the impl have `ORCHESTRATED` as another one?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java:
##########
@@ -38,4 +59,299 @@ public void testGetJobSpecFromDag() throws Exception {
       Assert.assertEquals(jobSpec.getConfigAsProperties().get(key), 
jobSpec.getConfig().getString(key));
     }
   }
+
+  @Test
+  public void testIsDagFinished() throws URISyntaxException {
+    long flowExecutionId = 12345L;
+    String flowGroup = "fg";
+    String flowName = "fn";
+
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        1, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+    );
+
+    setJobStatuses(dag, Collections.singletonList(COMPLETE));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(FAILED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(CANCELLED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(RUNNING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        2, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+    );
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    dag = buildComplexDag2("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), "user5",
+        ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(FAILED, PENDING, PENDING, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING, PENDING, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(PENDING, PENDING, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    dag = buildComplexDag3("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), "user5",
+        ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    dag = buildComplexDag1("1", flowExecutionId,
+        DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),"user5", 
ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY,  
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(FAILED, COMPLETE, COMPLETE, COMPLETE, 
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, CANCELLED, COMPLETE, COMPLETE, PENDING, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, PENDING_RESUME, COMPLETE, COMPLETE, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, PENDING_RETRY, COMPLETE, COMPLETE, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, RUNNING, COMPLETE, COMPLETE, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, FAILED, 
COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING, COMPLETE));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+  }
+
+  @Test
+  public void testIsDagFinishedWithFinishRunningFailureOption() throws 
URISyntaxException {
+    long flowExecutionId = 12345L;
+    String flowGroup = "fg";
+    String flowName = "fn";
+    Dag<JobExecutionPlan> dag = buildComplexDag4("1", flowExecutionId, 
DagManager.FailureOption.FINISH_RUNNING.name(), "user5",
+        ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, PENDING, 
PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, RUNNING, 
PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+  }
+
+  private void setJobStatuses(Dag<JobExecutionPlan> dag, List<ExecutionStatus> 
statuses) {
+    int i=0;
+    for (ExecutionStatus status : statuses) {
+      dag.getNodes().get(i++).getValue().setExecutionStatus(status);
+    }
+  }
+
+  // This creates a dag like this
+  //  D0  D1  D2  D3
+  //  |   |   | \ |
+  //  D4  D5  |  D6
+  //  |   |  \|
+  //  D7  |   D8
+  //    \ |  /
+  //      D9
+
+  public static Dag<JobExecutionPlan> buildComplexDag1(String id, long 
flowExecutionId,
+      String flowFailureOption, String proxyUser, Config additionalConfig) 
throws URISyntaxException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+    for (int i = 0; i < 10; i++) {
+      String suffix = Integer.toString(i);
+      Config jobConfig = ConfigBuilder.create().
+          addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+          addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId).
+          addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
+          addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, 
flowFailureOption).
+          addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+      jobConfig = additionalConfig.withFallback(jobConfig);
+      if (i == 4) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0"));
+      } else if (i == 5) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job1"));
+      } if (i == 6) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job2,job3"));
+      } else if (i == 7) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job4"));
+      } else if (i == 8) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job5,job2"));
+      } else if (i == 9) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job7,job5,job8"));
+      }
+      JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
+          withTemplate(new URI("job" + suffix)).build();
+      SpecExecutor specExecutor = 
MockedSpecExecutor.createDummySpecExecutor(new URI(
+          ConfigUtils.getString(additionalConfig, 
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
+      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
+      jobExecutionPlans.add(jobExecutionPlan);
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+
+  public static Dag<JobExecutionPlan> buildComplexDag2(String id, long 
flowExecutionId,

Review Comment:
   the picture for the one before was helpful.  what about something analogous 
here?
   
   (same w/ the others that follow)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 931648)
    Time Spent: 20m  (was: 10m)

> find if the dag is running or not correctly
> -------------------------------------------
>
>                 Key: GOBBLIN-2142
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2142
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to