[ 
https://issues.apache.org/jira/browse/GOBBLIN-1863?focusedWorklogId=873902&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-873902
 ]

ASF GitHub Bot logged work on GOBBLIN-1863:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jul/23 21:11
            Start Date: 31/Jul/23 21:11
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3727:
URL: https://github.com/apache/gobblin/pull/3727#discussion_r1279876662


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +470,54 @@ public synchronized void setActive(boolean active) {
     }
   }
 
+  /**
+   * Used by the DagManager to launch a new execution for a flow action event 
loaded from the DagActionStore upon
+   * setting this instance of the DagManager to active. Because it may be a 
completely new DAG not contained in the
+   * dagStore, we compile the flow to generate the dag before calling 
addDag(), handling any errors that may result in
+   * the process.
+   */
+  public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+    if (this.specCompiler.isPresent()) {
+      FlowId flowId = new 
FlowId().setFlowGroup(action.getFlowGroup()).setFlowName(action.getFlowName());
+      FlowSpec spec;
+      try {
+        URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+        spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+        Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.get().compileFlow(spec);
+        if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+          Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(spec);
+          // For scheduled flows, we do not insert the flowExecutionId into 
the FlowSpec. As a result, if the flow
+          // compilation fails (i.e. we are unable to find a path), the 
metadata will not have flowExecutionId.
+          // In this case, the current time is used as the flow executionId.
+          
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+              Long.toString(System.currentTimeMillis()));
+
+          String message = "Flow was not compiled successfully.";
+          if (!( spec).getCompilationErrors().isEmpty()) {
+            message = message + " Compilation errors encountered: " + ( 
spec).getCompilationErrors();
+          }
+          flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+          Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
+              new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+          if (flowCompileFailedTimer.isPresent()) {
+            flowCompileFailedTimer.get().stop(flowMetadata);
+          }

Review Comment:
   rather than copy-paste what you just defined as 
`emitFlowCompilationFailureEvent`, can't we call to that?  (e.g. make the impl 
`static` in `Orchestrator`, who also maybe has an instance method that 
delegates to it).
   
   that said, when we observe the need to add two new members to `DagMgr` plus 
to "borrow" a chunk of code from `Orchestrator`, it may be worth reviewing 
whether the legacy abstractions and separations of responsibility are still 
right for the present and future...





Issue Time Tracking
-------------------

    Worklog Id:     (was: 873902)
    Time Spent: 1h 20m  (was: 1h 10m)

> Multi-Active Launch Job Related Issues
> --------------------------------------
>
>                 Key: GOBBLIN-1863
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1863
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Urmi Mustafi
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> * DagManager check leader status before addDag bc calling this method from 
> non-leader hosts throws a NPE which may caused failed dag event to be emitted
>  * also handle LAUNCH type events upon leader change and setting a new 
> participant DagManager to be active. Failing to handle these events may be 
> causing missed flow launches on any leader change or restart. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to