[
https://issues.apache.org/jira/browse/GOBBLIN-808?focusedWorklogId=266114&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-266114
]
ASF GitHub Bot logged work on GOBBLIN-808:
------------------------------------------
Author: ASF GitHub Bot
Created on: 24/Jun/19 21:06
Start Date: 24/Jun/19 21:06
Worklog Time Spent: 10m
Work Description: sv2000 commented on pull request #2674: [GOBBLIN-808]
implement azkaban flow cancel when dag manager is enabled
URL: https://github.com/apache/incubator-gobblin/pull/2674#discussion_r296915562
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -187,12 +207,31 @@ protected void startUp() {
synchronized void offer(Dag<JobExecutionPlan> dag) throws IOException {
//Persist the dag
this.dagStateStore.writeCheckpoint(dag);
- //Add it to the queue of dags
- if (!this.queue.offer(dag)) {
+ long flowExecutionId =
Long.parseLong(dag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ int queueId = (int) (flowExecutionId % this.numThreads);
+ // Add the dag to the specific queue determined by flowExecutionId
+ if (!this.queue[queueId].offer(dag)) {
throw new IOException("Could not add dag" +
DagManagerUtils.generateDagId(dag) + "to queue");
}
}
+ /**
+ * Method to submit a {@link URI} for canellation requsts to the {@link
DagManager}.
+ * The {@link DagManager} adds the dag to the {@link BlockingQueue} to be
picked up by one of the {@link DagManagerThread}s.
+ */
+ synchronized public void offer(URI uri) {
+ String flowGroup =
FlowConfigResourceLocalHandler.FlowUriUtils.getFlowGroup(uri);
+ String flowName =
FlowConfigResourceLocalHandler.FlowUriUtils.getFlowName(uri);
+
+ List<Long> flowExecutionIds =
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
Review comment:
Can we make the number of executions to cancel configurable with a default
of 1?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 266114)
Time Spent: 1h 50m (was: 1h 40m)
> implement azkaban job cancellation in Gobblin Service
> -----------------------------------------------------
>
> Key: GOBBLIN-808
> URL: https://issues.apache.org/jira/browse/GOBBLIN-808
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)