[
https://issues.apache.org/jira/browse/GOBBLIN-1597?focusedWorklogId=710920&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-710920
]
ASF GitHub Bot logged work on GOBBLIN-1597:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/Jan/22 21:57
Start Date: 18/Jan/22 21:57
Worklog Time Spent: 10m
Work Description: Will-Lo commented on a change in pull request #3452:
URL: https://github.com/apache/gobblin/pull/3452#discussion_r787183303
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -709,48 +709,55 @@ private void pollAndAdvanceDag() throws IOException,
ExecutionException, Interru
List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
- boolean slaKilled = slaKillIfNeeded(node);
-
- JobStatus jobStatus = pollJobStatus(node);
-
- boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
-
- ExecutionStatus status = getJobExecutionStatus(slaKilled,
killOrphanFlow, jobStatus);
+ try {
+ boolean slaKilled = slaKillIfNeeded(node);
- JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(node);
+ JobStatus jobStatus = pollJobStatus(node);
- switch (status) {
- case COMPLETE:
- jobExecutionPlan.setExecutionStatus(COMPLETE);
- nextSubmitted.putAll(onJobFinish(node));
- nodesToCleanUp.add(node);
- break;
- case FAILED:
- jobExecutionPlan.setExecutionStatus(FAILED);
- nextSubmitted.putAll(onJobFinish(node));
- nodesToCleanUp.add(node);
- break;
- case CANCELLED:
- jobExecutionPlan.setExecutionStatus(CANCELLED);
- nextSubmitted.putAll(onJobFinish(node));
- nodesToCleanUp.add(node);
- break;
- case PENDING:
- jobExecutionPlan.setExecutionStatus(PENDING);
- break;
- case PENDING_RETRY:
- jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
- break;
- default:
- jobExecutionPlan.setExecutionStatus(RUNNING);
- break;
- }
+ boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
+
+ ExecutionStatus status = getJobExecutionStatus(slaKilled,
killOrphanFlow, jobStatus);
+
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(node);
+
+ switch (status) {
+ case COMPLETE:
+ jobExecutionPlan.setExecutionStatus(COMPLETE);
+ nextSubmitted.putAll(onJobFinish(node));
+ nodesToCleanUp.add(node);
+ break;
+ case FAILED:
+ jobExecutionPlan.setExecutionStatus(FAILED);
+ nextSubmitted.putAll(onJobFinish(node));
+ nodesToCleanUp.add(node);
+ break;
+ case CANCELLED:
+ jobExecutionPlan.setExecutionStatus(CANCELLED);
+ nextSubmitted.putAll(onJobFinish(node));
+ nodesToCleanUp.add(node);
+ break;
+ case PENDING:
+ jobExecutionPlan.setExecutionStatus(PENDING);
+ break;
+ case PENDING_RETRY:
+ jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
+ break;
+ default:
+ jobExecutionPlan.setExecutionStatus(RUNNING);
+ break;
+ }
- if (jobStatus != null && jobStatus.isShouldRetry()) {
- log.info("Retrying job: {}, current attempts: {}, max attempts: {}",
DagManagerUtils.getFullyQualifiedJobName(node),
- jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
- submitJob(node);
- }
+ if (jobStatus != null && jobStatus.isShouldRetry()) {
+ log.info("Retrying job: {}, current attempts: {}, max attempts:
{}", DagManagerUtils.getFullyQualifiedJobName(node),
+ jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+ submitJob(node);
+ }
+ } catch (Exception e) {
+ // Error occurred while processing dag, leave dag for next thread
execution to process
+ log.error(String.format("Exception caught in DagManager while
processing dag for flowGroup %s flowName %s due to ",
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY)),
e);
Review comment:
Yeah I thought about omitting the flow execution ID but if we are
searching logs by flowName it should still be caught anyways so this is better
cause we can also search by exec ID
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 710920)
Time Spent: 1h 10m (was: 1h)
> Fix error where DagManagerThread will restart all processing if encountering
> an error
> -------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1597
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1597
> Project: Apache Gobblin
> Issue Type: Task
> Components: gobblin-service
> Affects Versions: 0.16.0
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> If there is any error with the DagManager processing, it will abort all jobs
> processed in the thread and restart from the beginning, which could leave
> jobs in a permanently stuck state
--
This message was sent by Atlassian Jira
(v8.20.1#820001)