abhishekmjain commented on code in PR #4084:
URL: https://github.com/apache/gobblin/pull/4084#discussion_r1881521154


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -139,12 +138,15 @@ public static void 
submitJobToExecutor(DagManagementStateStore dagManagementStat
       dagManagementStateStore.updateDagNode(dagNode);
       sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
     } catch (Exception e) {
-      TimingEvent jobFailedTimer = 
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
-      String message = "Cannot submit job " + 
DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
-      log.error(message, e);
-      jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + 
e.getMessage());
-      if (jobFailedTimer != null) {
-        jobFailedTimer.stop(jobMetadata);
+      // Only mark the job as failed in case of non transient exceptions
+      if(!DagProcessingEngine.isTransientException(e)){
+        TimingEvent jobFailedTimer = 
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
+        String message = "Cannot submit job " + 
DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
+        log.error(message, e);

Review Comment:
   Should we atleast log the message? Even if it is a transient exception.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -139,12 +138,15 @@ public static void 
submitJobToExecutor(DagManagementStateStore dagManagementStat
       dagManagementStateStore.updateDagNode(dagNode);
       sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
     } catch (Exception e) {
-      TimingEvent jobFailedTimer = 
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
-      String message = "Cannot submit job " + 
DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
-      log.error(message, e);
-      jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + 
e.getMessage());
-      if (jobFailedTimer != null) {
-        jobFailedTimer.stop(jobMetadata);
+      // Only mark the job as failed in case of non transient exceptions
+      if(!DagProcessingEngine.isTransientException(e)){

Review Comment:
   nit: missing spaces
   `if (!DagProcessingEngine.isTransientException(e)) {`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java:
##########
@@ -0,0 +1,124 @@
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EventSubmitter.class)
+public class DagProcUtilsTest {
+
+  DagManagementStateStore dagManagementStateStore;
+  SpecExecutor mockSpecExecutor;
+
+  @BeforeClass
+  public void setUp() {
+    dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
+    mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class));
+  }
+
+  @Test
+  public void testSubmitNextNodesSuccess() throws URISyntaxException, 
IOException {
+    Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+    List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+    for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){
+      Dag.DagNode<JobExecutionPlan> dagNode = new 
Dag.DagNode<>(jobExecutionPlan);
+      dagNodeList.add(dagNode);
+    }
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
 Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+    DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);

Review Comment:
   Can we add a Mockito.verify for all the tests on dagManagementStateStore?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to