[ 
https://issues.apache.org/jira/browse/GOBBLIN-2123?focusedWorklogId=928618&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-928618
 ]

ASF GitHub Bot logged work on GOBBLIN-2123:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Aug/24 04:30
            Start Date: 03/Aug/24 04:30
    Worklog Time Spent: 10m 
      Work Description: arjun4084346 commented on code in PR #4013:
URL: https://github.com/apache/gobblin/pull/4013#discussion_r1702456107


##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java:
##########
@@ -263,6 +262,47 @@ public void testMultipleNextJobToRun() throws Exception {
     specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.never()).addSpec(any()));
   }
 
+  @Test
+  public void testRetryCurrentFailedJob() throws Exception {
+    String flowName = "fn5";
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        2, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+    );
+    List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
+    dagManagementStateStore.addDag(dag);
+    // a job status with shouldRetry=true
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
+        .jobName("job0").flowExecutionId(flowExecutionId).message("Test 
message").eventName(ExecutionStatus.FAILED.name())
+        
.startTime(flowExecutionId).shouldRetry(true).orchestratedTime(flowExecutionId).build();
+    doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)), 
Optional.of(jobStatus)))
+        .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
+    ReevaluateDagProc
+        reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
+        flowExecutionId, "job0", DagActionStore.DagActionType.REEVALUATE), 
null,
+        dagManagementStateStore, mockedDagProcEngineMetrics));
+    reEvaluateDagProc.process(dagManagementStateStore, 
mockedDagProcEngineMetrics);
+
+    int numOfLaunchedJobs = 1; // only the current job
+    // only the current job should have run
+    Mockito.verify(specProducers.get(0), Mockito.times(1)).addSpec(any());
+
+    specProducers.stream().skip(numOfLaunchedJobs) // separately verified 
`specProducers.get(0)`

Review Comment:
   yes, ok, updated the comment





Issue Time Tracking
-------------------

    Worklog Id:     (was: 928618)
    Time Spent: 0.5h  (was: 20m)

> create reevaluate dag action for jobs in state PENDING_RETRY
> ------------------------------------------------------------
>
>                 Key: GOBBLIN-2123
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2123
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to