[ 
https://issues.apache.org/jira/browse/GOBBLIN-1863?focusedWorklogId=873913&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-873913
 ]

ASF GitHub Bot logged work on GOBBLIN-1863:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jul/23 22:40
            Start Date: 31/Jul/23 22:40
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3727:
URL: https://github.com/apache/gobblin/pull/3727#discussion_r1279951090


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +358,41 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  /**
+   * Abstraction used to populate the message of and emit a FlowCompileFailed 
event for the Orchestrator.
+   * @param spec
+   * @param flowMetadata
+   */
+  public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String> 
flowMetadata) {
+    // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
+    // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
+    // In this case, the current time is used as the flow executionId.
+    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        Long.toString(System.currentTimeMillis()));
+
+    String message = "Flow was not compiled successfully.";
+    if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+      message = message + " Compilation errors encountered: " + ((FlowSpec) 
spec).getCompilationErrors();
+    }
+    _log.warn(message);
+    flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+    Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
+        new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+    if (flowCompileFailedTimer.isPresent()) {
+      flowCompileFailedTimer.get().stop(flowMetadata);
+    }

Review Comment:
   See previous response about Java Optional vs. Google Optional 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 873913)
    Time Spent: 1h 40m  (was: 1.5h)

> Multi-Active Launch Job Related Issues
> --------------------------------------
>
>                 Key: GOBBLIN-1863
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1863
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Urmi Mustafi
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> * DagManager check leader status before addDag bc calling this method from 
> non-leader hosts throws a NPE which may caused failed dag event to be emitted
>  * also handle LAUNCH type events upon leader change and setting a new 
> participant DagManager to be active. Failing to handle these events may be 
> causing missed flow launches on any leader change or restart. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to