phet commented on code in PR #3727:
URL: https://github.com/apache/gobblin/pull/3727#discussion_r1279643104


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +358,41 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  /**
+   * Abstraction used to populate the message of and emit a FlowCompileFailed 
event for the Orchestrator.
+   * @param spec
+   * @param flowMetadata
+   */
+  public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String> 
flowMetadata) {
+    // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
+    // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
+    // In this case, the current time is used as the flow executionId.
+    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        Long.toString(System.currentTimeMillis()));
+
+    String message = "Flow was not compiled successfully.";
+    if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+      message = message + " Compilation errors encountered: " + ((FlowSpec) 
spec).getCompilationErrors();
+    }
+    _log.warn(message);

Review Comment:
   this wasn't logged before.  is it not enough to return the outcome to the 
user?  should we also log it internally? (there may be a lot...)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +470,54 @@ public synchronized void setActive(boolean active) {
     }
   }
 
+  /**
+   * Used by the DagManager to launch a new execution for a flow action event 
loaded from the DagActionStore upon
+   * setting this instance of the DagManager to active. Because it may be a 
completely new DAG not contained in the
+   * dagStore, we compile the flow to generate the dag before calling 
addDag(), handling any errors that may result in
+   * the process.
+   */
+  public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+    if (this.specCompiler.isPresent()) {
+      FlowId flowId = new 
FlowId().setFlowGroup(action.getFlowGroup()).setFlowName(action.getFlowName());

Review Comment:
   perhaps `DagAction` deserves a `getFlowId()` method?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +470,54 @@ public synchronized void setActive(boolean active) {
     }
   }
 
+  /**
+   * Used by the DagManager to launch a new execution for a flow action event 
loaded from the DagActionStore upon
+   * setting this instance of the DagManager to active. Because it may be a 
completely new DAG not contained in the
+   * dagStore, we compile the flow to generate the dag before calling 
addDag(), handling any errors that may result in
+   * the process.
+   */
+  public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+    if (this.specCompiler.isPresent()) {

Review Comment:
   ```
   this.specCompiler.ifPresent(theSpecCompiler -> {
     ...
   });
   ```
   ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +358,41 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  /**
+   * Abstraction used to populate the message of and emit a FlowCompileFailed 
event for the Orchestrator.
+   * @param spec
+   * @param flowMetadata
+   */
+  public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String> 
flowMetadata) {
+    // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
+    // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
+    // In this case, the current time is used as the flow executionId.
+    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        Long.toString(System.currentTimeMillis()));
+
+    String message = "Flow was not compiled successfully.";
+    if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+      message = message + " Compilation errors encountered: " + ((FlowSpec) 
spec).getCompilationErrors();
+    }
+    _log.warn(message);
+    flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+    Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
+        new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+    if (flowCompileFailedTimer.isPresent()) {
+      flowCompileFailedTimer.get().stop(flowMetadata);
+    }

Review Comment:
   if this works (w/o being challenged by non-RuntimeExceptions), it would be 
more idiomatic:
   ```
   eventSubmitter.transform(...).ifPresent(timer -> timer.stop(fmd));
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +470,54 @@ public synchronized void setActive(boolean active) {
     }
   }
 
+  /**
+   * Used by the DagManager to launch a new execution for a flow action event 
loaded from the DagActionStore upon
+   * setting this instance of the DagManager to active. Because it may be a 
completely new DAG not contained in the
+   * dagStore, we compile the flow to generate the dag before calling 
addDag(), handling any errors that may result in
+   * the process.
+   */
+  public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+    if (this.specCompiler.isPresent()) {
+      FlowId flowId = new 
FlowId().setFlowGroup(action.getFlowGroup()).setFlowName(action.getFlowName());
+      FlowSpec spec;
+      try {
+        URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+        spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+        Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.get().compileFlow(spec);
+        if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+          Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(spec);
+          // For scheduled flows, we do not insert the flowExecutionId into 
the FlowSpec. As a result, if the flow
+          // compilation fails (i.e. we are unable to find a path), the 
metadata will not have flowExecutionId.
+          // In this case, the current time is used as the flow executionId.
+          
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+              Long.toString(System.currentTimeMillis()));
+
+          String message = "Flow was not compiled successfully.";
+          if (!( spec).getCompilationErrors().isEmpty()) {
+            message = message + " Compilation errors encountered: " + ( 
spec).getCompilationErrors();
+          }
+          flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+          Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
+              new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+          if (flowCompileFailedTimer.isPresent()) {
+            flowCompileFailedTimer.get().stop(flowMetadata);
+          }
+        }
+        addDag(jobExecutionPlanDag, true, true);
+      } catch (URISyntaxException e) {
+        log.warn("Could not create URI object for flowId {} due to error {}", 
flowId, e.getMessage());
+      } catch (SpecNotFoundException e) {
+        log.warn("Spec not found for flow group: {} name: {} Exception: {}", 
action.getFlowGroup(),
+            action.getFlowName(), e);
+      } catch (IOException e) {
+        log.warn("Failed to add Job Execution Plan for flow group: {} name: {} 
due to error {}",
+            action.getFlowGroup(), action.getFlowName(), e);

Review Comment:
   for consistency, let's have ALL log `flowId`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +470,54 @@ public synchronized void setActive(boolean active) {
     }
   }
 
+  /**
+   * Used by the DagManager to launch a new execution for a flow action event 
loaded from the DagActionStore upon
+   * setting this instance of the DagManager to active. Because it may be a 
completely new DAG not contained in the
+   * dagStore, we compile the flow to generate the dag before calling 
addDag(), handling any errors that may result in
+   * the process.
+   */
+  public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+    if (this.specCompiler.isPresent()) {
+      FlowId flowId = new 
FlowId().setFlowGroup(action.getFlowGroup()).setFlowName(action.getFlowName());
+      FlowSpec spec;
+      try {
+        URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+        spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+        Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.get().compileFlow(spec);
+        if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+          Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(spec);
+          // For scheduled flows, we do not insert the flowExecutionId into 
the FlowSpec. As a result, if the flow
+          // compilation fails (i.e. we are unable to find a path), the 
metadata will not have flowExecutionId.
+          // In this case, the current time is used as the flow executionId.
+          
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+              Long.toString(System.currentTimeMillis()));
+
+          String message = "Flow was not compiled successfully.";
+          if (!( spec).getCompilationErrors().isEmpty()) {
+            message = message + " Compilation errors encountered: " + ( 
spec).getCompilationErrors();
+          }
+          flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+          Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
+              new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+          if (flowCompileFailedTimer.isPresent()) {
+            flowCompileFailedTimer.get().stop(flowMetadata);
+          }

Review Comment:
   rather than copy-paste what you just defined as 
`emitFlowCompilationFailureEvent`, can't we call to that?  (e.g. make the impl 
`static` and then maybe have an `Orchestrator` method that delegates to it).
   
   that said, when we observe the need to add two new members to `DagMgr` plus 
to "borrow" a chunk of code from `Orchestrator`, it may be worth reviewing 
whether the legacy abstractions and separations of responsibility are still 
right for the present and future...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +358,41 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  /**
+   * Abstraction used to populate the message of and emit a FlowCompileFailed 
event for the Orchestrator.
+   * @param spec
+   * @param flowMetadata
+   */
+  public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String> 
flowMetadata) {
+    // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
+    // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
+    // In this case, the current time is used as the flow executionId.
+    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        Long.toString(System.currentTimeMillis()));
+
+    String message = "Flow was not compiled successfully.";
+    if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+      message = message + " Compilation errors encountered: " + ((FlowSpec) 
spec).getCompilationErrors();
+    }
+    _log.warn(message);
+    flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+    Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
+        new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+    if (flowCompileFailedTimer.isPresent()) {
+      flowCompileFailedTimer.get().stop(flowMetadata);
+    }
+  }
+
   public void submitFlowToDagManager(FlowSpec flowSpec)
       throws IOException {
-    submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      emitFlowCompilationFailedEvent(flowSpec, 
TimingEventUtils.getFlowMetadata(flowSpec));
+      return;
+    }
+    submitFlowToDagManager(flowSpec, jobExecutionPlanDag);

Review Comment:
   I find `else` MUCH preferable to the early return here



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +358,41 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  /**
+   * Abstraction used to populate the message of and emit a FlowCompileFailed 
event for the Orchestrator.
+   * @param spec
+   * @param flowMetadata
+   */
+  public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String> 
flowMetadata) {

Review Comment:
   no biggie, but naming-wise, I agree the timing is emitted... but what may be 
most noteworthy is that it adds the `METADATA_MESSAGE`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to