phet commented on code in PR #3727:
URL: https://github.com/apache/gobblin/pull/3727#discussion_r1279876662


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +470,54 @@ public synchronized void setActive(boolean active) {
     }
   }
 
+  /**
+   * Used by the DagManager to launch a new execution for a flow action event 
loaded from the DagActionStore upon
+   * setting this instance of the DagManager to active. Because it may be a 
completely new DAG not contained in the
+   * dagStore, we compile the flow to generate the dag before calling 
addDag(), handling any errors that may result in
+   * the process.
+   */
+  public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+    if (this.specCompiler.isPresent()) {
+      FlowId flowId = new 
FlowId().setFlowGroup(action.getFlowGroup()).setFlowName(action.getFlowName());
+      FlowSpec spec;
+      try {
+        URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+        spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+        Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.get().compileFlow(spec);
+        if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+          Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(spec);
+          // For scheduled flows, we do not insert the flowExecutionId into 
the FlowSpec. As a result, if the flow
+          // compilation fails (i.e. we are unable to find a path), the 
metadata will not have flowExecutionId.
+          // In this case, the current time is used as the flow executionId.
+          
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+              Long.toString(System.currentTimeMillis()));
+
+          String message = "Flow was not compiled successfully.";
+          if (!( spec).getCompilationErrors().isEmpty()) {
+            message = message + " Compilation errors encountered: " + ( 
spec).getCompilationErrors();
+          }
+          flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+          Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
+              new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+          if (flowCompileFailedTimer.isPresent()) {
+            flowCompileFailedTimer.get().stop(flowMetadata);
+          }

Review Comment:
   rather than copy-paste what you just defined as 
`emitFlowCompilationFailureEvent`, can't we call to that?  (e.g. make the impl 
`static` in `Orchestrator`, who also maybe has an instance method that 
delegates to it).
   
   that said, when we observe the need to add two new members to `DagMgr` plus 
to "borrow" a chunk of code from `Orchestrator`, it may be worth reviewing 
whether the legacy abstractions and separations of responsibility are still 
right for the present and future...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to