phet commented on code in PR #3837:
URL: https://github.com/apache/gobblin/pull/3837#discussion_r1409735986


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -335,22 +343,25 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec, Optional<String> 
optionalFlowExecutionId) throws IOException, InterruptedException {
+  public void submitFlowToDagManager(FlowSpec flowSpec, 
DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
     Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec, 
optionalFlowExecutionId);
+        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
+            Optional.of(flowAction.getFlowExecutionId()));
     if (optionalJobExecutionPlanDag.isPresent()) {
-      submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
+      submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get(), 
flowAction);
     } else {
       _log.warn("Flow: {} submitted to dagManager failed to compile and 
produce a job execution plan dag", flowSpec);
       Instrumented.markMeter(this.flowOrchestrationFailedMeter);
     }
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> 
jobExecutionPlanDag)
+  public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> 
jobExecutionPlanDag,
+      DagActionStore.DagAction launchAction)
       throws IOException {
     try {
-      //Send the dag to the DagManager.
+      // Send the dag to the DagManager and delete the action after persisting 
it to avoid redundant execution on start up
       this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+      this.dagActionStore.get().deleteDagAction(launchAction);

Review Comment:
   that's great news.  we're all set then!  sorry, FSR I thought it merely 
added the DAG to a queue, and hadn't recalled that it synchronously persists 
the DAG.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to