arjun4084346 commented on code in PR #3965:
URL: https://github.com/apache/gobblin/pull/3965#discussion_r1631589630


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -89,11 +86,29 @@ protected void act(DagManagementStateStore 
dagManagementStateStore, Pair<Optiona
       return;
     }
 
+    if (dagNodeWithJobStatus.getRight().isPresent()
+        && 
!FlowStatusGenerator.FINISHED_STATUSES.contains(dagNodeWithJobStatus.getRight().get().getEventName()))
 {
+      // this may happen if adding job status in the store failed after adding 
a ReevaluateDagAction in KafkaJobStatusMonitor
+      throw new RuntimeException(String.format("Job status for dagNode %s is 
%s. Re-evaluate dag action are created for"
+              + " new jobs with no job status when there are multiple of them 
to run next; or when a job finishes with status - %s",
+          dagNodeId, dagNodeWithJobStatus.getRight().get().getEventName(), 
FlowStatusGenerator.FINISHED_STATUSES));
+    }
+
     Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeWithJobStatus.getLeft().get();
+
+    if (!dagNodeWithJobStatus.getRight().isPresent()) {
+      // if the job status is not present, this job was never launched, submit 
it now
+      submitJobForThisDagNode(dagManagementStateStore, dagNode);
+      return;
+    }

Review Comment:
   It comes down to how we define the responsibilities of launch and reevaluate 
dagprocs
   I see LaunchDagProc's main responsibility to get do the preparation of 
starting a dag, which includes **creating** a dag out of a flowConfig, and 
bring the flow into a launchable state.
   I imagine it having **only one** LaunchDagProc in the whole process. If we 
create multiple LaunchDagProcs we may have to again add clumsy code to handle 
cases and possible do duplicate "preparation" work.
   
   ReevaluateDagProc comes handy here because it is not a 
"LaunchNextJobsDagProc", it may assume additional responsibility of doing a 
broader reevaluation and may decide to launch the current job also when needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to