Will-Lo commented on code in PR #3727:
URL: https://github.com/apache/gobblin/pull/3727#discussion_r1279761914
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +470,54 @@ public synchronized void setActive(boolean active) {
}
}
+ /**
+ * Used by the DagManager to launch a new execution for a flow action event
loaded from the DagActionStore upon
+ * setting this instance of the DagManager to active. Because it may be a
completely new DAG not contained in the
+ * dagStore, we compile the flow to generate the dag before calling
addDag(), handling any errors that may result in
+ * the process.
+ */
+ public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+ if (this.specCompiler.isPresent()) {
+ FlowId flowId = new
FlowId().setFlowGroup(action.getFlowGroup()).setFlowName(action.getFlowName());
Review Comment:
Same comment here as the one below, wanted to double check that all the
checks are being done before submitting these flows which the orchestrator is
doing, otherwise there will be some bugs introduced that are guarded against.
Is it possible to use a helper or a static function?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -280,20 +290,22 @@ protected void startUp() {
* Note this should only be called from the {@link Orchestrator} or {@link
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
*/
public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist,
boolean setStatus) throws IOException {
- if (persist) {
- //Persist the dag
- this.dagStateStore.writeCheckpoint(dag);
- }
- int queueId = DagManagerUtils.getDagQueueId(dag, this.numThreads);
- // Add the dag to the specific queue determined by flowExecutionId
- // Flow cancellation request has to be forwarded to the same
DagManagerThread where the
- // flow create request was forwarded. This is because Azkaban Exec Id is
stored in the DagNode of the
- // specific DagManagerThread queue
- if (!this.runQueue[queueId].offer(dag)) {
- throw new IOException("Could not add dag" +
DagManagerUtils.generateDagId(dag) + "to queue");
- }
- if (setStatus) {
- submitEventsAndSetStatus(dag);
+ if (isActive) {
Review Comment:
Stylistic nit but sometimes I like doing the reverse and exit early. Exactly
the same thing but sometimes more readable than a lot of nested if statements
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +358,41 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
+ /**
+ * Abstraction used to populate the message of and emit a FlowCompileFailed
event for the Orchestrator.
+ * @param spec
+ * @param flowMetadata
+ */
+ public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String>
flowMetadata) {
+ // For scheduled flows, we do not insert the flowExecutionId into the
FlowSpec. As a result, if the flow
+ // compilation fails (i.e. we are unable to find a path), the metadata
will not have flowExecutionId.
+ // In this case, the current time is used as the flow executionId.
+
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ Long.toString(System.currentTimeMillis()));
+
+ String message = "Flow was not compiled successfully.";
+ if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+ message = message + " Compilation errors encountered: " + ((FlowSpec)
spec).getCompilationErrors();
+ }
+ _log.warn(message);
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+ Optional<TimingEvent> flowCompileFailedTimer =
eventSubmitter.transform(submitter ->
+ new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+ if (flowCompileFailedTimer.isPresent()) {
+ flowCompileFailedTimer.get().stop(flowMetadata);
+ }
+ }
+
public void submitFlowToDagManager(FlowSpec flowSpec)
throws IOException {
- submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ emitFlowCompilationFailedEvent(flowSpec,
TimingEventUtils.getFlowMetadata(flowSpec));
+ return;
+ }
+ submitFlowToDagManager(flowSpec, jobExecutionPlanDag);
Review Comment:
So I'm wondering why we don't directly call the `orchestrate()` function
from the SpecMonitor instead of `submitFlowToDagManager`, because there are a
few more checks done in that function, namely:
1. Checking for another execution
2. Ensuring that the specCompiler is ready before compilation (happens less
these days with a file based compiler but still possible that there will be a
race condition introduced where you try to compile a flow when the service
starts up but the flowgraph is not completed).
If this is not possible, we need to abstract these checks to another
function and ensure that they're done before submitting to the dagmanager in
any configuration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]