[ 
https://issues.apache.org/jira/browse/GOBBLIN-1982?focusedWorklogId=898839&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-898839
 ]

ASF GitHub Bot logged work on GOBBLIN-1982:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Jan/24 02:32
            Start Date: 10/Jan/24 02:32
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3854:
URL: https://github.com/apache/gobblin/pull/3854#discussion_r1446815650


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -463,7 +461,7 @@ private void deleteFromExecutor(Spec spec, Properties 
headers) {
    Deletes spec from flowCatalog if it is an adhoc flow (not containing a job 
schedule)
  */
   private void deleteSpecFromCatalogIfAdhoc(FlowSpec flowSpec) {
-    if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+    if (!flowSpec.isScheduled()) {

Review Comment:
   big improvement! :)
   
   did we have this method all along, but just weren't using it here?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -53,10 +53,14 @@ public interface MultiActiveLeaseArbiter {
    * @param flowAction uniquely identifies the flow and the present action 
upon it
    * @param eventTimeMillis is the time this flow action was triggered
    * @param isReminderEvent true if the flow action event we're checking on is 
a reminder event
+   * @param skipFlowExecutionIdReplacement if true then does not replace the 
flowExecutionId in the flowAction returned

Review Comment:
   skip... replacement naming seems indirect.  better might be to reverse the 
sense and call it `replaceFlowExecId`.  even better might be 
`adoptConsensusFlowExecutionId`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -281,10 +267,22 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
           return;
         }
 
-        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis, isReminderEvent);
+        // Skip flowExecutionId replacement for adhoc flows
+        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis, isReminderEvent,
+            !flowSpec.isScheduled());
         _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
             flowAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
       } else {
+        Optional<TimingEvent> flowCompilationTimer =
+          this.eventSubmitter.transform(submitter -> new 
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
+        Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =

Review Comment:
   nit: I'd clarify the sense of optional by calling `compiledDag` or 
`validatedDag`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 898839)
    Time Spent: 1h  (was: 50m)

> Show a consistent flowExecutionId btwn Compilation & Execution 
> ---------------------------------------------------------------
>
>                 Key: GOBBLIN-1982
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1982
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> The problem statement addressed in this issue is to determine a unique ID per 
> execution that is agreed upon by all hosts, computed before returning any 
> information back to user (about compilation or execution).
> Upon receiving the request for an adhoc flow, the recipient host creates a 
> flowExecutionId when initializing FlowSpec from config for non-scheduled 
> flows (see 
> [code|https://jarvis.corp.linkedin.com/codesearch/result/?name=FlowConfigResourceLocalHandler.java&path=gobblin-elr%2Fgobblin-restli%2Fgobblin-flow-config-service%2Fgobblin-flow-config-service-server%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fgobblin%2Fservice&reponame=linkedin%2Fgobblin-elr#276]).
>  This flowExecutionId is returned to the user for tracking the flow status. 
> This should not change later on.
> Scheduled flows are fired upon each host at a different system clock time, so 
> those ones need a consensus mechanism to coordinate between hosts. During 
> multiActiveLeaseArbitration we update the flowExecutionId of a DagAction with 
> an agreed upon value from the database to gain this consistency. However, 
> this should only be done for scheduled flows before we any information 
> externally about the flowExecutionId until later.
> To address the problems above we 
> 1) skip flowExecutionId replacement for adhoc flows
> 2) remove a flow compilation and GTE emission before the consensus on 
> flowExecutionId is removed.
> There's no significant impact of removing this check. It will result in 
> dagActions created for flows that may fail compilation later (after lease 
> arbitration and before execution). Since we already compile the flow on 
> accepting it, we are okay with a slight delay in failing a flow. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to