[ 
https://issues.apache.org/jira/browse/GOBBLIN-1597?focusedWorklogId=710810&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-710810
 ]

ASF GitHub Bot logged work on GOBBLIN-1597:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jan/22 19:46
            Start Date: 18/Jan/22 19:46
    Worklog Time Spent: 10m 
      Work Description: phet commented on a change in pull request #3452:
URL: https://github.com/apache/gobblin/pull/3452#discussion_r787093037



##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -709,48 +709,55 @@ private void pollAndAdvanceDag() throws IOException, 
ExecutionException, Interru
       List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
 
       for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
-        boolean slaKilled = slaKillIfNeeded(node);
-
-        JobStatus jobStatus = pollJobStatus(node);
-
-        boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
-
-        ExecutionStatus status = getJobExecutionStatus(slaKilled, 
killOrphanFlow, jobStatus);
+        try {
+          boolean slaKilled = slaKillIfNeeded(node);
 
-        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(node);
+          JobStatus jobStatus = pollJobStatus(node);
 
-        switch (status) {
-          case COMPLETE:
-            jobExecutionPlan.setExecutionStatus(COMPLETE);
-            nextSubmitted.putAll(onJobFinish(node));
-            nodesToCleanUp.add(node);
-            break;
-          case FAILED:
-            jobExecutionPlan.setExecutionStatus(FAILED);
-            nextSubmitted.putAll(onJobFinish(node));
-            nodesToCleanUp.add(node);
-            break;
-          case CANCELLED:
-            jobExecutionPlan.setExecutionStatus(CANCELLED);
-            nextSubmitted.putAll(onJobFinish(node));
-            nodesToCleanUp.add(node);
-            break;
-          case PENDING:
-            jobExecutionPlan.setExecutionStatus(PENDING);
-            break;
-          case PENDING_RETRY:
-            jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
-            break;
-          default:
-            jobExecutionPlan.setExecutionStatus(RUNNING);
-            break;
-        }
+          boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
+
+          ExecutionStatus status = getJobExecutionStatus(slaKilled, 
killOrphanFlow, jobStatus);
+
+          JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(node);
+
+          switch (status) {
+            case COMPLETE:
+              jobExecutionPlan.setExecutionStatus(COMPLETE);
+              nextSubmitted.putAll(onJobFinish(node));
+              nodesToCleanUp.add(node);
+              break;
+            case FAILED:
+              jobExecutionPlan.setExecutionStatus(FAILED);
+              nextSubmitted.putAll(onJobFinish(node));
+              nodesToCleanUp.add(node);
+              break;
+            case CANCELLED:
+              jobExecutionPlan.setExecutionStatus(CANCELLED);
+              nextSubmitted.putAll(onJobFinish(node));
+              nodesToCleanUp.add(node);
+              break;
+            case PENDING:
+              jobExecutionPlan.setExecutionStatus(PENDING);
+              break;
+            case PENDING_RETRY:
+              jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
+              break;
+            default:
+              jobExecutionPlan.setExecutionStatus(RUNNING);
+              break;
+          }
 
-        if (jobStatus != null && jobStatus.isShouldRetry()) {
-          log.info("Retrying job: {}, current attempts: {}, max attempts: {}", 
DagManagerUtils.getFullyQualifiedJobName(node),
-              jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
-          submitJob(node);
-        }
+          if (jobStatus != null && jobStatus.isShouldRetry()) {
+            log.info("Retrying job: {}, current attempts: {}, max attempts: 
{}", DagManagerUtils.getFullyQualifiedJobName(node),
+                jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+            submitJob(node);
+          }
+        } catch (Exception e) {
+            // Error occurred while processing dag, leave dag for next thread 
execution to process

Review comment:
       this is true... but the bigger difference, after this change, is to 
"continue processing other dags assigned to this thread"
   (not essential comment to leave, but might help maintainers who are reading 
quickly)

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -709,48 +709,55 @@ private void pollAndAdvanceDag() throws IOException, 
ExecutionException, Interru
       List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
 
       for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
-        boolean slaKilled = slaKillIfNeeded(node);
-
-        JobStatus jobStatus = pollJobStatus(node);
-
-        boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
-
-        ExecutionStatus status = getJobExecutionStatus(slaKilled, 
killOrphanFlow, jobStatus);
+        try {
+          boolean slaKilled = slaKillIfNeeded(node);
 
-        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(node);
+          JobStatus jobStatus = pollJobStatus(node);
 
-        switch (status) {
-          case COMPLETE:
-            jobExecutionPlan.setExecutionStatus(COMPLETE);
-            nextSubmitted.putAll(onJobFinish(node));
-            nodesToCleanUp.add(node);
-            break;
-          case FAILED:
-            jobExecutionPlan.setExecutionStatus(FAILED);
-            nextSubmitted.putAll(onJobFinish(node));
-            nodesToCleanUp.add(node);
-            break;
-          case CANCELLED:
-            jobExecutionPlan.setExecutionStatus(CANCELLED);
-            nextSubmitted.putAll(onJobFinish(node));
-            nodesToCleanUp.add(node);
-            break;
-          case PENDING:
-            jobExecutionPlan.setExecutionStatus(PENDING);
-            break;
-          case PENDING_RETRY:
-            jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
-            break;
-          default:
-            jobExecutionPlan.setExecutionStatus(RUNNING);
-            break;
-        }
+          boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);
+
+          ExecutionStatus status = getJobExecutionStatus(slaKilled, 
killOrphanFlow, jobStatus);
+
+          JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(node);
+
+          switch (status) {
+            case COMPLETE:
+              jobExecutionPlan.setExecutionStatus(COMPLETE);
+              nextSubmitted.putAll(onJobFinish(node));
+              nodesToCleanUp.add(node);
+              break;
+            case FAILED:
+              jobExecutionPlan.setExecutionStatus(FAILED);
+              nextSubmitted.putAll(onJobFinish(node));
+              nodesToCleanUp.add(node);
+              break;
+            case CANCELLED:
+              jobExecutionPlan.setExecutionStatus(CANCELLED);
+              nextSubmitted.putAll(onJobFinish(node));
+              nodesToCleanUp.add(node);
+              break;
+            case PENDING:
+              jobExecutionPlan.setExecutionStatus(PENDING);
+              break;
+            case PENDING_RETRY:
+              jobExecutionPlan.setExecutionStatus(PENDING_RETRY);
+              break;
+            default:
+              jobExecutionPlan.setExecutionStatus(RUNNING);
+              break;
+          }
 
-        if (jobStatus != null && jobStatus.isShouldRetry()) {
-          log.info("Retrying job: {}, current attempts: {}, max attempts: {}", 
DagManagerUtils.getFullyQualifiedJobName(node),
-              jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
-          submitJob(node);
-        }
+          if (jobStatus != null && jobStatus.isShouldRetry()) {
+            log.info("Retrying job: {}, current attempts: {}, max attempts: 
{}", DagManagerUtils.getFullyQualifiedJobName(node),
+                jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+            submitJob(node);
+          }
+        } catch (Exception e) {
+            // Error occurred while processing dag, leave dag for next thread 
execution to process
+            log.error(String.format("Exception caught in DagManager while 
processing dag for flowGroup %s flowName %s due to ",
+                
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+                
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY)),
 e);

Review comment:
       maybe `DagManagerUtils.getFullyQualifiedJobName` or `.getFlowId` (If you 
don't also want the exec id in there)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 710810)
    Time Spent: 1h  (was: 50m)

> Fix error where DagManagerThread will restart all processing if encountering 
> an error
> -------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1597
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1597
>             Project: Apache Gobblin
>          Issue Type: Task
>          Components: gobblin-service
>    Affects Versions: 0.16.0
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> If there is any error with the DagManager processing, it will abort all jobs 
> processed in the thread and restart from the beginning, which could leave 
> jobs in a permanently stuck state



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to