[ 
https://issues.apache.org/jira/browse/GOBBLIN-1342?focusedWorklogId=533663&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-533663
 ]

ASF GitHub Bot logged work on GOBBLIN-1342:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Jan/21 04:06
            Start Date: 10/Jan/21 04:06
    Worklog Time Spent: 10m 
      Work Description: sv2000 commented on a change in pull request #3179:
URL: https://github.com/apache/incubator-gobblin/pull/3179#discussion_r554219344



##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -462,6 +501,64 @@ public void run() {
       }
     }
 
+    /**
+     * Begin resuming a dag by setting the status of both the dag and the 
failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
+     * and also sending events so that this status will be reflected in the 
job status state store.
+     */
+    private void beginResumingDag(String dagId) {
+      Dag<JobExecutionPlan> dag = this.failedDags.get(dagId);
+      if (dag == null) {
+        log.warn("No dag found with dagId " + dagId + ", so cannot resume 
flow");
+        return;
+      }
+
+      long flowResumeTime = System.currentTimeMillis();
+
+      // Set the flow and it's failed or cancelled nodes to PENDING_RESUME so 
that the flow will be resumed from the point before it failed
+      DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, 
TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
+      for (DagNode<JobExecutionPlan> node : dag.getNodes()) {
+        ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
+        if (executionStatus.equals(FAILED) || 
executionStatus.equals(CANCELLED)) {
+          node.getValue().setExecutionStatus(PENDING_RESUME);
+        }
+        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
+        
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
+
+        // Set flowStartTime so that flow SLA will be based on current time 
instead of original flow
+        node.getValue().setFlowStartTime(flowResumeTime);
+      }
+
+      this.resumingDags.put(dagId, dag);
+    }
+
+    /**
+     * Finish resuming dags by first verifying the status is correct (flow 
should be {@link ExecutionStatus#PENDING_RESUME}
+     * and jobs should not be {@link ExecutionStatus#FAILED} or {@link 
ExecutionStatus#CANCELLED}) and then calling
+     * {@link #initialize}. This is separated from {@link #beginResumingDag} 
because it could take some time for the
+     * job status state store to reflect the updated status.
+     */
+    private void finishResumingDags() throws IOException {
+      for (Map.Entry<String, Dag<JobExecutionPlan>> dag : 
this.resumingDags.entrySet()) {
+        JobStatus flowStatus = pollFlowStatus(dag.getValue());
+        if (!flowStatus.getEventName().equals(PENDING_RESUME.name())) {
+          return;
+        }
+
+        for (DagNode<JobExecutionPlan> node : dag.getValue().getNodes()) {
+          JobStatus jobStatus = pollJobStatus(node);
+          if (jobStatus == null || 
jobStatus.getEventName().equals(FAILED.name()) || 
jobStatus.getEventName().equals(CANCELLED.name())) {
+            return;

Review comment:
       Same comment as earlier.

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -462,6 +501,64 @@ public void run() {
       }
     }
 
+    /**
+     * Begin resuming a dag by setting the status of both the dag and the 
failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
+     * and also sending events so that this status will be reflected in the 
job status state store.
+     */
+    private void beginResumingDag(String dagId) {
+      Dag<JobExecutionPlan> dag = this.failedDags.get(dagId);
+      if (dag == null) {
+        log.warn("No dag found with dagId " + dagId + ", so cannot resume 
flow");
+        return;
+      }
+
+      long flowResumeTime = System.currentTimeMillis();
+
+      // Set the flow and it's failed or cancelled nodes to PENDING_RESUME so 
that the flow will be resumed from the point before it failed
+      DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, 
TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
+      for (DagNode<JobExecutionPlan> node : dag.getNodes()) {
+        ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
+        if (executionStatus.equals(FAILED) || 
executionStatus.equals(CANCELLED)) {
+          node.getValue().setExecutionStatus(PENDING_RESUME);
+        }
+        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
+        
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
+
+        // Set flowStartTime so that flow SLA will be based on current time 
instead of original flow
+        node.getValue().setFlowStartTime(flowResumeTime);
+      }
+
+      this.resumingDags.put(dagId, dag);
+    }
+
+    /**
+     * Finish resuming dags by first verifying the status is correct (flow 
should be {@link ExecutionStatus#PENDING_RESUME}
+     * and jobs should not be {@link ExecutionStatus#FAILED} or {@link 
ExecutionStatus#CANCELLED}) and then calling
+     * {@link #initialize}. This is separated from {@link #beginResumingDag} 
because it could take some time for the
+     * job status state store to reflect the updated status.
+     */
+    private void finishResumingDags() throws IOException {
+      for (Map.Entry<String, Dag<JobExecutionPlan>> dag : 
this.resumingDags.entrySet()) {
+        JobStatus flowStatus = pollFlowStatus(dag.getValue());
+        if (!flowStatus.getEventName().equals(PENDING_RESUME.name())) {

Review comment:
       Possible NPE here. Is there a possibility that flowStatus is null?

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -462,6 +501,64 @@ public void run() {
       }
     }
 
+    /**
+     * Begin resuming a dag by setting the status of both the dag and the 
failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
+     * and also sending events so that this status will be reflected in the 
job status state store.
+     */
+    private void beginResumingDag(String dagId) {
+      Dag<JobExecutionPlan> dag = this.failedDags.get(dagId);
+      if (dag == null) {
+        log.warn("No dag found with dagId " + dagId + ", so cannot resume 
flow");
+        return;
+      }
+
+      long flowResumeTime = System.currentTimeMillis();
+
+      // Set the flow and it's failed or cancelled nodes to PENDING_RESUME so 
that the flow will be resumed from the point before it failed
+      DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, 
TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
+      for (DagNode<JobExecutionPlan> node : dag.getNodes()) {
+        ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
+        if (executionStatus.equals(FAILED) || 
executionStatus.equals(CANCELLED)) {
+          node.getValue().setExecutionStatus(PENDING_RESUME);
+        }
+        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
+        
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
+
+        // Set flowStartTime so that flow SLA will be based on current time 
instead of original flow
+        node.getValue().setFlowStartTime(flowResumeTime);
+      }
+
+      this.resumingDags.put(dagId, dag);
+    }
+
+    /**
+     * Finish resuming dags by first verifying the status is correct (flow 
should be {@link ExecutionStatus#PENDING_RESUME}
+     * and jobs should not be {@link ExecutionStatus#FAILED} or {@link 
ExecutionStatus#CANCELLED}) and then calling
+     * {@link #initialize}. This is separated from {@link #beginResumingDag} 
because it could take some time for the
+     * job status state store to reflect the updated status.
+     */
+    private void finishResumingDags() throws IOException {
+      for (Map.Entry<String, Dag<JobExecutionPlan>> dag : 
this.resumingDags.entrySet()) {
+        JobStatus flowStatus = pollFlowStatus(dag.getValue());
+        if (!flowStatus.getEventName().equals(PENDING_RESUME.name())) {
+          return;

Review comment:
       Should this be continue instead of return? Don't you want to process the 
remiaining dags?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 533663)
    Time Spent: 1h 20m  (was: 1h 10m)

> Add API to resume a flow in gaas
> --------------------------------
>
>                 Key: GOBBLIN-1342
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1342
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Jack Moseley
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to