umustafi commented on code in PR #3837:
URL: https://github.com/apache/gobblin/pull/3837#discussion_r1408482374


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -335,22 +343,25 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec, Optional<String> 
optionalFlowExecutionId) throws IOException, InterruptedException {
+  public void submitFlowToDagManager(FlowSpec flowSpec, 
DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
     Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec, 
optionalFlowExecutionId);
+        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
+            Optional.of(flowAction.getFlowExecutionId()));
     if (optionalJobExecutionPlanDag.isPresent()) {
-      submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
+      submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get(), 
flowAction);
     } else {
       _log.warn("Flow: {} submitted to dagManager failed to compile and 
produce a job execution plan dag", flowSpec);
       Instrumented.markMeter(this.flowOrchestrationFailedMeter);
     }
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> 
jobExecutionPlanDag)
+  public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> 
jobExecutionPlanDag,
+      DagActionStore.DagAction launchAction)
       throws IOException {
     try {
-      //Send the dag to the DagManager.
+      // Send the dag to the DagManager and delete the action after persisting 
it to avoid redundant execution on start up
       this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+      this.dagActionStore.get().deleteDagAction(launchAction);

Review Comment:
   If `DagManager.addDag` completes then we have persisted 
([code](https://github.com/apache/gobblin/blob/35304e9676f82feeb6cc93332707a2bd9cae6b74/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java#L317))
 the `dag` in MySQL `DagStateStore` already and its status and completion will 
be tracked by existing `DagManager` functionality via series of checkpoints 
once its [submitted to 
executor](https://github.com/apache/gobblin/blob/35304e9676f82feeb6cc93332707a2bd9cae6b74/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java#L1107)
 or fails or next dag is called etc... We load all dags from state store upon 
startup and while `DagManager` is active it should keep track of it. In 
pre-multi-active `DagMgr` I don't believe we should proceed further 
necessarily. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to