Will-Lo commented on code in PR #3727:
URL: https://github.com/apache/gobblin/pull/3727#discussion_r1281064022


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +336,114 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec)
+  /**
+   * If it is a scheduled flow (and hence, does not have flowExecutionId in 
the FlowSpec) and the flow compilation is
+   * successful, retrieve the flowExecutionId from the JobSpec.
+   */
+  public void addFlowExecutionIdIfAbsent(Map<String,String> flowMetadata, 
Dag<JobExecutionPlan> jobExecutionPlanDag) {
+    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(
+            ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public boolean isExecutionPermittedHandler(Config flowConfig, Spec spec, 
String flowName, String flowGroup)
       throws IOException {
-    submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
+    boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig, 
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+    if (!isExecutionPermitted(flowName, flowGroup, allowConcurrentExecution)) {
+      _log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+      conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SKIPPED);
+      Instrumented.markMeter(this.skippedFlowsMeter);
+      if (!((FlowSpec) 
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+        // For ad-hoc flow, we might already increase quota, we need to 
decrease here
+        for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+          quotaManager.releaseQuota(dagNode);
+        }
+      }
+
+      // Send FLOW_FAILED event
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+      flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
+          + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flow spec to change this behaviour.");
+      if (this.eventSubmitter.isPresent()) {
+        new TimingEvent(this.eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Abstraction used to populate the message of and emit a FlowCompileFailed 
event for the Orchestrator.
+   * @param spec
+   * @param flowMetadata
+   */
+  public void populateFlowCompilationFailedEventMessage(Spec spec,
+      Map<String, String> flowMetadata) {
+    // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
+    // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
+    // In this case, the current time is used as the flow executionId.
+    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        Long.toString(System.currentTimeMillis()));
+
+    String message = "Flow was not compiled successfully.";
+    if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+      message = message + " Compilation errors encountered: " + ((FlowSpec) 
spec).getCompilationErrors();
+    }
+    flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+    Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
+        new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+    if (flowCompileFailedTimer.isPresent()) {
+      flowCompileFailedTimer.get().stop(flowMetadata);
+    }
+  }
+
+  public Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(FlowSpec 
flowSpec)

Review Comment:
   Add javadoc explaining what this does



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +471,33 @@ public synchronized void setActive(boolean active) {
     }
   }
 
+  /**
+   * Used by the DagManager to launch a new execution for a flow action event 
loaded from the DagActionStore upon
+   * setting this instance of the DagManager to active. Because it may be a 
completely new DAG not contained in the
+   * dagStore, we compile the flow to generate the dag before calling 
addDag(), handling any errors that may result in
+   * the process.
+   */
+  public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+    FlowId flowId = action.getFlowId();
+    FlowSpec spec;
+    try {
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+      spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag = 
orchestrator.handleChecksBeforeExecution(spec);

Review Comment:
   I'm thinking since we need to pass the orchestrator to the dagmanager here 
which seems like an antipattern, it would be easier to have the orchestrator 
handle the dagmanager launch event on startup. What would the LOE be in that 
case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to