arjun4084346 commented on code in PR #4012:
URL: https://github.com/apache/gobblin/pull/4012#discussion_r1700868425
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -319,14 +322,30 @@ public void submitFlowToDagManager(FlowSpec flowSpec,
Dag<JobExecutionPlan> jobE
}
public void remove(Spec spec, Properties headers) throws IOException {
+ URI uri = spec.getUri();
// TODO: Evolve logic to cache and reuse previously compiled JobSpecs
// .. this will work for Identity compiler but not always for multi-hop.
// Note: Current logic assumes compilation is consistent between all
executions
if (spec instanceof FlowSpec) {
- //Send the dag to the DagManager to stop it.
- //Also send it to the SpecProducer to do any cleanup tasks on
SpecExecutor.
- _log.info("Forwarding cancel request for flow URI {} to DagManager.",
spec.getUri());
- this.dagManager.stopDag(spec.getUri());
+ String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+ String flowName = FlowSpec.Utils.getFlowName(uri);
+ if (this.flowLaunchHandler.isPresent()) {
+ List<Long> flowExecutionIds =
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+ _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+ for (long flowExecutionId : flowExecutionIds) {
+ DagActionStore.DagAction killDagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
+ DagActionStore.DagActionType.KILL);
Review Comment:
yea, it is about killing all flows if any. assuming 10 flows to
cancel...just copied from DagManager code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]