[
https://issues.apache.org/jira/browse/GOBBLIN-2121?focusedWorklogId=928364&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-928364
]
ASF GitHub Bot logged work on GOBBLIN-2121:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 01/Aug/24 21:57
Start Date: 01/Aug/24 21:57
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #4012:
URL: https://github.com/apache/gobblin/pull/4012#discussion_r1700916336
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -319,14 +322,30 @@ public void submitFlowToDagManager(FlowSpec flowSpec,
Dag<JobExecutionPlan> jobE
}
public void remove(Spec spec, Properties headers) throws IOException {
+ URI uri = spec.getUri();
// TODO: Evolve logic to cache and reuse previously compiled JobSpecs
// .. this will work for Identity compiler but not always for multi-hop.
// Note: Current logic assumes compilation is consistent between all
executions
if (spec instanceof FlowSpec) {
- //Send the dag to the DagManager to stop it.
- //Also send it to the SpecProducer to do any cleanup tasks on
SpecExecutor.
- _log.info("Forwarding cancel request for flow URI {} to DagManager.",
spec.getUri());
- this.dagManager.stopDag(spec.getUri());
+ String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+ String flowName = FlowSpec.Utils.getFlowName(uri);
+ if (this.flowLaunchHandler.isPresent()) {
+ List<Long> flowExecutionIds =
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+ _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+ for (long flowExecutionId : flowExecutionIds) {
+ DagActionStore.DagAction killDagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
+ DagActionStore.DagActionType.KILL);
Review Comment:
I see this is only for delete flowSpec then it makes sense
Issue Time Tracking
-------------------
Worklog Id: (was: 928364)
Time Spent: 1h 20m (was: 1h 10m)
> redirect kill requests to dag proc engine
> ------------------------------------------
>
> Key: GOBBLIN-2121
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2121
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)