phet commented on code in PR #3944:
URL: https://github.com/apache/gobblin/pull/3944#discussion_r1591495305
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -229,33 +229,39 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
_log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
launchDagAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
} else {
- TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
- Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
- flowName, flowMetadata);
-
- if (!compiledDagOptional.isPresent()) {
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
- return;
- }
- Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
- if (compiledDag.isEmpty()) {
-
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec, flowMetadata);
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ try {
+ TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+ Optional<Dag<JobExecutionPlan>> compiledDagOptional =
+
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
+ flowName, flowMetadata);
+
+ if (!compiledDagOptional.isPresent()) {
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ return;
+ }
+ Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
+ if (compiledDag.isEmpty()) {
+
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec,
+ flowMetadata);
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+ SharedFlowMetricsSingleton.CompiledState.FAILED);
+ _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
+ return;
+ }
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.FAILED);
- _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
- return;
- }
- sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
+ SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
-
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
- flowCompilationTimer.stop(flowMetadata);
+
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
+ flowCompilationTimer.stop(flowMetadata);
- // Depending on if DagManager is present, handle execution
- submitFlowToDagManager(flowSpec, compiledDag);
+ // Depending on if DagManager is present, handle execution
+ submitFlowToDagManager(flowSpec, compiledDag);
+ } finally {
+ // remove from the flow catalog, regardless of whether the flow was
successfully validated and permitted to exec (concurrently)
+ this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
+ }
Review Comment:
too bad the diff above doesn't clearly indicate it was solely an indentation
change to add the `try ... finally` here. the purpose of which is to ensure
FlowCatalog cleanup, come whatever may
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]