[ 
https://issues.apache.org/jira/browse/GOBBLIN-2121?focusedWorklogId=928361&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-928361
 ]

ASF GitHub Bot logged work on GOBBLIN-2121:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Aug/24 21:09
            Start Date: 01/Aug/24 21:09
    Worklog Time Spent: 10m 
      Work Description: arjun4084346 commented on code in PR #4012:
URL: https://github.com/apache/gobblin/pull/4012#discussion_r1700868425


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -319,14 +322,30 @@ public void submitFlowToDagManager(FlowSpec flowSpec, 
Dag<JobExecutionPlan> jobE
   }
 
   public void remove(Spec spec, Properties headers) throws IOException {
+    URI uri = spec.getUri();
     // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all 
executions
     if (spec instanceof FlowSpec) {
-      //Send the dag to the DagManager to stop it.
-      //Also send it to the SpecProducer to do any cleanup tasks on 
SpecExecutor.
-      _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
-      this.dagManager.stopDag(spec.getUri());
+      String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+      String flowName = FlowSpec.Utils.getFlowName(uri);
+      if (this.flowLaunchHandler.isPresent()) {
+        List<Long> flowExecutionIds = 
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+        _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+        for (long flowExecutionId : flowExecutionIds) {
+        DagActionStore.DagAction killDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
+            DagActionStore.DagActionType.KILL);

Review Comment:
   yea, it is about killing all flows if any. assuming 10 flows to 
cancel...just copied from DagManager code





Issue Time Tracking
-------------------

    Worklog Id:     (was: 928361)
    Time Spent: 1h  (was: 50m)

>  redirect kill requests to dag proc engine
> ------------------------------------------
>
>                 Key: GOBBLIN-2121
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2121
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to