phet commented on code in PR #3727:
URL: https://github.com/apache/gobblin/pull/3727#discussion_r1279643104
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +358,41 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
+ /**
+ * Abstraction used to populate the message of and emit a FlowCompileFailed
event for the Orchestrator.
+ * @param spec
+ * @param flowMetadata
+ */
+ public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String>
flowMetadata) {
+ // For scheduled flows, we do not insert the flowExecutionId into the
FlowSpec. As a result, if the flow
+ // compilation fails (i.e. we are unable to find a path), the metadata
will not have flowExecutionId.
+ // In this case, the current time is used as the flow executionId.
+
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ Long.toString(System.currentTimeMillis()));
+
+ String message = "Flow was not compiled successfully.";
+ if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+ message = message + " Compilation errors encountered: " + ((FlowSpec)
spec).getCompilationErrors();
+ }
+ _log.warn(message);
Review Comment:
this wasn't logged before. is it not enough to return the outcome to the
user? should we also log it internally? (there may be a lot...)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +470,54 @@ public synchronized void setActive(boolean active) {
}
}
+ /**
+ * Used by the DagManager to launch a new execution for a flow action event
loaded from the DagActionStore upon
+ * setting this instance of the DagManager to active. Because it may be a
completely new DAG not contained in the
+ * dagStore, we compile the flow to generate the dag before calling
addDag(), handling any errors that may result in
+ * the process.
+ */
+ public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+ if (this.specCompiler.isPresent()) {
+ FlowId flowId = new
FlowId().setFlowGroup(action.getFlowGroup()).setFlowName(action.getFlowName());
Review Comment:
perhaps `DagAction` deserves a `getFlowId()` method?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +470,54 @@ public synchronized void setActive(boolean active) {
}
}
+ /**
+ * Used by the DagManager to launch a new execution for a flow action event
loaded from the DagActionStore upon
+ * setting this instance of the DagManager to active. Because it may be a
completely new DAG not contained in the
+ * dagStore, we compile the flow to generate the dag before calling
addDag(), handling any errors that may result in
+ * the process.
+ */
+ public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+ if (this.specCompiler.isPresent()) {
Review Comment:
```
this.specCompiler.ifPresent(theSpecCompiler -> {
...
});
```
?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +358,41 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
+ /**
+ * Abstraction used to populate the message of and emit a FlowCompileFailed
event for the Orchestrator.
+ * @param spec
+ * @param flowMetadata
+ */
+ public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String>
flowMetadata) {
+ // For scheduled flows, we do not insert the flowExecutionId into the
FlowSpec. As a result, if the flow
+ // compilation fails (i.e. we are unable to find a path), the metadata
will not have flowExecutionId.
+ // In this case, the current time is used as the flow executionId.
+
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ Long.toString(System.currentTimeMillis()));
+
+ String message = "Flow was not compiled successfully.";
+ if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+ message = message + " Compilation errors encountered: " + ((FlowSpec)
spec).getCompilationErrors();
+ }
+ _log.warn(message);
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+ Optional<TimingEvent> flowCompileFailedTimer =
eventSubmitter.transform(submitter ->
+ new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+ if (flowCompileFailedTimer.isPresent()) {
+ flowCompileFailedTimer.get().stop(flowMetadata);
+ }
Review Comment:
if this works (w/o being challenged by non-RuntimeExceptions), it would be
more idiomatic:
```
eventSubmitter.transform(...).ifPresent(timer -> timer.stop(fmd));
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +470,54 @@ public synchronized void setActive(boolean active) {
}
}
+ /**
+ * Used by the DagManager to launch a new execution for a flow action event
loaded from the DagActionStore upon
+ * setting this instance of the DagManager to active. Because it may be a
completely new DAG not contained in the
+ * dagStore, we compile the flow to generate the dag before calling
addDag(), handling any errors that may result in
+ * the process.
+ */
+ public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+ if (this.specCompiler.isPresent()) {
+ FlowId flowId = new
FlowId().setFlowGroup(action.getFlowGroup()).setFlowName(action.getFlowName());
+ FlowSpec spec;
+ try {
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+ spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.get().compileFlow(spec);
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(spec);
+ // For scheduled flows, we do not insert the flowExecutionId into
the FlowSpec. As a result, if the flow
+ // compilation fails (i.e. we are unable to find a path), the
metadata will not have flowExecutionId.
+ // In this case, the current time is used as the flow executionId.
+
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ Long.toString(System.currentTimeMillis()));
+
+ String message = "Flow was not compiled successfully.";
+ if (!( spec).getCompilationErrors().isEmpty()) {
+ message = message + " Compilation errors encountered: " + (
spec).getCompilationErrors();
+ }
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+ Optional<TimingEvent> flowCompileFailedTimer =
eventSubmitter.transform(submitter ->
+ new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+ if (flowCompileFailedTimer.isPresent()) {
+ flowCompileFailedTimer.get().stop(flowMetadata);
+ }
+ }
+ addDag(jobExecutionPlanDag, true, true);
+ } catch (URISyntaxException e) {
+ log.warn("Could not create URI object for flowId {} due to error {}",
flowId, e.getMessage());
+ } catch (SpecNotFoundException e) {
+ log.warn("Spec not found for flow group: {} name: {} Exception: {}",
action.getFlowGroup(),
+ action.getFlowName(), e);
+ } catch (IOException e) {
+ log.warn("Failed to add Job Execution Plan for flow group: {} name: {}
due to error {}",
+ action.getFlowGroup(), action.getFlowName(), e);
Review Comment:
for consistency, let's have ALL log `flowId`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +470,54 @@ public synchronized void setActive(boolean active) {
}
}
+ /**
+ * Used by the DagManager to launch a new execution for a flow action event
loaded from the DagActionStore upon
+ * setting this instance of the DagManager to active. Because it may be a
completely new DAG not contained in the
+ * dagStore, we compile the flow to generate the dag before calling
addDag(), handling any errors that may result in
+ * the process.
+ */
+ public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+ if (this.specCompiler.isPresent()) {
+ FlowId flowId = new
FlowId().setFlowGroup(action.getFlowGroup()).setFlowName(action.getFlowName());
+ FlowSpec spec;
+ try {
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+ spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.get().compileFlow(spec);
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(spec);
+ // For scheduled flows, we do not insert the flowExecutionId into
the FlowSpec. As a result, if the flow
+ // compilation fails (i.e. we are unable to find a path), the
metadata will not have flowExecutionId.
+ // In this case, the current time is used as the flow executionId.
+
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ Long.toString(System.currentTimeMillis()));
+
+ String message = "Flow was not compiled successfully.";
+ if (!( spec).getCompilationErrors().isEmpty()) {
+ message = message + " Compilation errors encountered: " + (
spec).getCompilationErrors();
+ }
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+ Optional<TimingEvent> flowCompileFailedTimer =
eventSubmitter.transform(submitter ->
+ new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+ if (flowCompileFailedTimer.isPresent()) {
+ flowCompileFailedTimer.get().stop(flowMetadata);
+ }
Review Comment:
rather than copy-paste what you just defined as
`emitFlowCompilationFailureEvent`, can't we call to that? (e.g. make the impl
`static` and then maybe have an `Orchestrator` method that delegates to it).
that said, when we observe the need to add two new members to `DagMgr` plus
to "borrow" a chunk of code from `Orchestrator`, it may be worth reviewing
whether the legacy abstractions and separations of responsibility are still
right for the present and future...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +358,41 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
+ /**
+ * Abstraction used to populate the message of and emit a FlowCompileFailed
event for the Orchestrator.
+ * @param spec
+ * @param flowMetadata
+ */
+ public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String>
flowMetadata) {
+ // For scheduled flows, we do not insert the flowExecutionId into the
FlowSpec. As a result, if the flow
+ // compilation fails (i.e. we are unable to find a path), the metadata
will not have flowExecutionId.
+ // In this case, the current time is used as the flow executionId.
+
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ Long.toString(System.currentTimeMillis()));
+
+ String message = "Flow was not compiled successfully.";
+ if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+ message = message + " Compilation errors encountered: " + ((FlowSpec)
spec).getCompilationErrors();
+ }
+ _log.warn(message);
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+ Optional<TimingEvent> flowCompileFailedTimer =
eventSubmitter.transform(submitter ->
+ new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+ if (flowCompileFailedTimer.isPresent()) {
+ flowCompileFailedTimer.get().stop(flowMetadata);
+ }
+ }
+
public void submitFlowToDagManager(FlowSpec flowSpec)
throws IOException {
- submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ emitFlowCompilationFailedEvent(flowSpec,
TimingEventUtils.getFlowMetadata(flowSpec));
+ return;
+ }
+ submitFlowToDagManager(flowSpec, jobExecutionPlanDag);
Review Comment:
I find `else` MUCH preferable to the early return here
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -374,9 +358,41 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
+ /**
+ * Abstraction used to populate the message of and emit a FlowCompileFailed
event for the Orchestrator.
+ * @param spec
+ * @param flowMetadata
+ */
+ public void emitFlowCompilationFailedEvent(Spec spec, Map<String, String>
flowMetadata) {
Review Comment:
no biggie, but naming-wise, I agree the timing is emitted... but what may be
most noteworthy is that it adds the `METADATA_MESSAGE`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]