[ 
https://issues.apache.org/jira/browse/GOBBLIN-2017?focusedWorklogId=922689&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-922689
 ]

ASF GitHub Bot logged work on GOBBLIN-2017:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Jun/24 23:05
            Start Date: 07/Jun/24 23:05
    Worklog Time Spent: 10m 
      Work Description: arjun4084346 commented on code in PR #3965:
URL: https://github.com/apache/gobblin/pull/3965#discussion_r1631769843


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -60,21 +59,19 @@ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<JobStatus>> ini
     Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
dagNodeWithJobStatus =
         dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
 
-    if (!dagNodeWithJobStatus.getLeft().isPresent() || 
!dagNodeWithJobStatus.getRight().isPresent()) {
+    if (!dagNodeWithJobStatus.getLeft().isPresent()) {
       // this is possible when MALA malfunctions and a duplicated reevaluate 
dag proc is launched for a dag node that is
       // already "reevaluated" and cleaned up.
       return ImmutablePair.of(Optional.empty(), Optional.empty());
     }
 
-    ExecutionStatus executionStatus = 
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
-    if 
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
-      log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should 
have been created only for finished status - {}",
-          dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
-      // this may happen if adding job status in the store failed after adding 
a ReevaluateDagAction in KafkaJobStatusMonitor
-      throw new RuntimeException(String.format("Job status %s is not final for 
job %s", executionStatus, getDagId()));
+    if (dagNodeWithJobStatus.getRight().isPresent()) {

Review Comment:
   in multi job cases, at the dag start or at in the middle of the flow 
execution (say on the 4th job), whenever "next jobs to run" are more than one, 
we create reevaluate dag proc for **those next jobs**. next jobs status is not 
present. 
   in resume case, status maybe actually present, if the dag ran in the earlier 
attempt.
   did that answer your question?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 922689)
    Time Spent: 4h 50m  (was: 4h 40m)

> divide multiple job launches in a LaunchDagProc into multiple LaunchDagActions
> ------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2017
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2017
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> divide multiple job launches in a LaunchDagProc into multiple 
> LaunchDagActions for two reasons
> 1) it will then spend less time in each dag proc processing and have more 
> chance to complete the operation in the lease time
> 2) handling partial job submissions in one LaunchDagProc sending N jobs is 
> difficult



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to