phet commented on code in PR #3837:
URL: https://github.com/apache/gobblin/pull/3837#discussion_r1408449867


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -335,22 +343,25 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec, Optional<String> 
optionalFlowExecutionId) throws IOException, InterruptedException {
+  public void submitFlowToDagManager(FlowSpec flowSpec, 
DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
     Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec, 
optionalFlowExecutionId);
+        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
+            Optional.of(flowAction.getFlowExecutionId()));
     if (optionalJobExecutionPlanDag.isPresent()) {
-      submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
+      submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get(), 
flowAction);
     } else {
       _log.warn("Flow: {} submitted to dagManager failed to compile and 
produce a job execution plan dag", flowSpec);
       Instrumented.markMeter(this.flowOrchestrationFailedMeter);
     }
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> 
jobExecutionPlanDag)
+  public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> 
jobExecutionPlanDag,
+      DagActionStore.DagAction launchAction)
       throws IOException {
     try {
-      //Send the dag to the DagManager.
+      // Send the dag to the DagManager and delete the action after persisting 
it to avoid redundant execution on start up
       this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+      this.dagActionStore.get().deleteDagAction(launchAction);

Review Comment:
   overall I recognize we're in an interim phase of our multi-leader journey, 
where we have yet to integrate the DagMgr w/ the MultiActiveLeaseArbiter... 
this is only temporary so let's not over-engineer
   
   I'm concerned however that this PR's approach presumes successful handling 
for non-durable/in-memory state.  a safer pattern would be to remove the LAUNCH 
action from the DagAction store only after the DM has *finished* launching the 
flow execution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to