vsinghal85 commented on code in PR #4084:
URL: https://github.com/apache/gobblin/pull/4084#discussion_r1883638016


##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java:
##########
@@ -0,0 +1,125 @@
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+public class DagProcUtilsTest {
+
+  DagManagementStateStore dagManagementStateStore;
+  SpecExecutor mockSpecExecutor;
+
+  @BeforeTest
+  public void setUp() {
+    dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
+    mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class));
+  }
+
+  @Test
+  public void testSubmitNextNodesSuccess() throws URISyntaxException, 
IOException {
+    Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+    List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+    for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){
+      Dag.DagNode<JobExecutionPlan> dagNode = new 
Dag.DagNode<>(jobExecutionPlan);
+      dagNodeList.add(dagNode);
+    }
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
 Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+    DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(jobExecutionPlans.size())).addJobDagAction(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+  }
+
+  @Test
+  public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, 
IOException {
+    Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680);
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+    JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0);
+    Dag.DagNode<JobExecutionPlan> dagNode = new 
Dag.DagNode<>(jobExecutionPlan);
+    dagNodeList.add(dagNode);
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
+    
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
+    Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
+    Mockito.doNothing().when(metrics).incrementJobsSentToExecutor(dagNode);
+    DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(2)).getDagManagerMetrics();
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).updateDagNode(dagNode);
+    Mockito.verify(metrics, 
Mockito.times(1)).incrementRunningJobMetrics(dagNode);
+    Mockito.verify(metrics, 
Mockito.times(1)).incrementJobsSentToExecutor(dagNode);
+  }
+
+  @Test(expectedExceptions = RuntimeException.class, dependsOnMethods = 
"testWhenSubmitToExecutorSuccess")
+  public void testWhenSubmitToExecutorGivesRuntimeException() throws 
URISyntaxException, IOException, ExecutionException, InterruptedException{
+    Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678);
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+    JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2);
+    Dag.DagNode<JobExecutionPlan> dagNode = new 
Dag.DagNode<>(jobExecutionPlan);
+    dagNodeList.add(dagNode);
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    SpecProducer<Spec> mockedSpecProducer = 
mockSpecExecutor.getProducer().get();
+    
Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class));
+    DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
+    
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
+    Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
+    DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+    Mockito.verify(mockedSpecProducer, 
Mockito.times(1)).addSpec(Mockito.any(JobSpec.class));
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(1)).getDagManagerMetrics();
+    Mockito.verify(metrics, 
Mockito.times(1)).incrementRunningJobMetrics(dagNode);
+  }
+
+  private List<JobExecutionPlan> getJobExecutionPlans() throws 
URISyntaxException {
+    Config flowConfig1 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, 
"flowName1")
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build();
+    Config flowConfig2 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, 
"flowName2")
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build();
+    Config flowConfig3 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, 
"flowName3")
+        .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build();
+    List<Config> flowConfigs = Arrays.asList(flowConfig1, flowConfig2, 
flowConfig3);
+
+    Config jobConfig1 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
+        .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, 
"source:destination:edgeName1").build();
+    Config jobConfig2 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2")
+        .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, 
"source:destination:edgeName2").build();
+    Config jobConfig3 = 
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
+        .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, 
"source:destination:edgeName3").build();
+    List<Config> jobConfigs = Arrays.asList(jobConfig1, jobConfig2, 
jobConfig3);
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      Config jobConfig = jobConfigs.get(i);
+      FlowSpec flowSpec = 
FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build();
+      if(i==2){
+        jobExecutionPlans.add(new 
JobExecutionPlan.Factory().createPlan(flowSpec, 
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
+            ConfigValueFactory.fromAnyRef("testUri")), mockSpecExecutor, 0L, 
ConfigFactory.empty()));
+      }
+      else{
+        jobExecutionPlans.add(new 
JobExecutionPlan.Factory().createPlan(flowSpec, 
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,

Review Comment:
   corrected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to