umustafi commented on code in PR #3727:
URL: https://github.com/apache/gobblin/pull/3727#discussion_r1281090842
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -455,6 +471,33 @@ public synchronized void setActive(boolean active) {
}
}
+ /**
+ * Used by the DagManager to launch a new execution for a flow action event
loaded from the DagActionStore upon
+ * setting this instance of the DagManager to active. Because it may be a
completely new DAG not contained in the
+ * dagStore, we compile the flow to generate the dag before calling
addDag(), handling any errors that may result in
+ * the process.
+ */
+ public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+ FlowId flowId = action.getFlowId();
+ FlowSpec spec;
+ try {
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+ spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+ Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
orchestrator.handleChecksBeforeExecution(spec);
Review Comment:
Moving this function to the `Orchestrator`, does not actually change the
code pattern here since there's only a bit of wrapper code (mainly to get the
spec corresponding to the `dagAction`) around the `Orchestrator` helper funcs
that it relies on. It's more clear to keep it here since the use case is
`DagManager` specific
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]