[
https://issues.apache.org/jira/browse/GOBBLIN-2181?focusedWorklogId=948182&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-948182
]
ASF GitHub Bot logged work on GOBBLIN-2181:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Dec/24 07:44
Start Date: 13/Dec/24 07:44
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4084:
URL: https://github.com/apache/gobblin/pull/4084#discussion_r1883470081
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -139,12 +138,15 @@ public static void
submitJobToExecutor(DagManagementStateStore dagManagementStat
dagManagementStateStore.updateDagNode(dagNode);
sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
} catch (Exception e) {
- TimingEvent jobFailedTimer =
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " +
DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
- jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " +
e.getMessage());
- if (jobFailedTimer != null) {
- jobFailedTimer.stop(jobMetadata);
+ // Only mark the job as failed in case of non transient exceptions
+ if(!DagProcessingEngine.isTransientException(e)) {
Review Comment:
whitespace
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java:
##########
@@ -0,0 +1,125 @@
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+public class DagProcUtilsTest {
+
+ DagManagementStateStore dagManagementStateStore;
+ SpecExecutor mockSpecExecutor;
+
+ @BeforeTest
+ public void setUp() {
+ dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
+ mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class));
+ }
+
+ @Test
+ public void testSubmitNextNodesSuccess() throws URISyntaxException,
IOException {
+ Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+ List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+ List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+ for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){
+ Dag.DagNode<JobExecutionPlan> dagNode = new
Dag.DagNode<>(jobExecutionPlan);
+ dagNodeList.add(dagNode);
+ }
Review Comment:
```
List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = jobExecutionPlans.stream()
.map(Dag.DagNode<JobExecutionPlan>::new)
.collect(Collectors.toList());
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
Review Comment:
do we still need this from L73:
```
this.nonRetryableExceptions = ConfigUtils.getStringList(config,
ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY)
.stream().map(className -> {
try {
return (Class<? extends Exception>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
```
?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java:
##########
@@ -0,0 +1,125 @@
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+public class DagProcUtilsTest {
+
+ DagManagementStateStore dagManagementStateStore;
+ SpecExecutor mockSpecExecutor;
+
+ @BeforeTest
+ public void setUp() {
+ dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
+ mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class));
+ }
+
+ @Test
+ public void testSubmitNextNodesSuccess() throws URISyntaxException,
IOException {
+ Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+ List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+ List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+ for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){
+ Dag.DagNode<JobExecutionPlan> dagNode = new
Dag.DagNode<>(jobExecutionPlan);
+ dagNodeList.add(dagNode);
+ }
+ Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+ Mockito.verify(dagManagementStateStore,
Mockito.times(jobExecutionPlans.size())).addJobDagAction(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
Review Comment:
are they all of type `DagActionStore.DagActionType.REEVALUATE)`? let's
verify
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java:
##########
@@ -0,0 +1,125 @@
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+public class DagProcUtilsTest {
+
+ DagManagementStateStore dagManagementStateStore;
+ SpecExecutor mockSpecExecutor;
+
+ @BeforeTest
+ public void setUp() {
+ dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
+ mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class));
+ }
+
+ @Test
+ public void testSubmitNextNodesSuccess() throws URISyntaxException,
IOException {
+ Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+ List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+ List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+ for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){
+ Dag.DagNode<JobExecutionPlan> dagNode = new
Dag.DagNode<>(jobExecutionPlan);
+ dagNodeList.add(dagNode);
+ }
+ Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+ Mockito.verify(dagManagementStateStore,
Mockito.times(jobExecutionPlans.size())).addJobDagAction(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+ }
+
+ @Test
+ public void testWhenSubmitToExecutorSuccess() throws URISyntaxException,
IOException {
+ Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680);
+ List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+ JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0);
+ Dag.DagNode<JobExecutionPlan> dagNode = new
Dag.DagNode<>(jobExecutionPlan);
+ dagNodeList.add(dagNode);
+ Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+ DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
+
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
+ Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
+ Mockito.doNothing().when(metrics).incrementJobsSentToExecutor(dagNode);
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+ Mockito.verify(dagManagementStateStore,
Mockito.times(2)).getDagManagerMetrics();
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).updateDagNode(dagNode);
+ Mockito.verify(metrics,
Mockito.times(1)).incrementRunningJobMetrics(dagNode);
+ Mockito.verify(metrics,
Mockito.times(1)).incrementJobsSentToExecutor(dagNode);
+ }
+
+ @Test(expectedExceptions = RuntimeException.class, dependsOnMethods =
"testWhenSubmitToExecutorSuccess")
+ public void testWhenSubmitToExecutorGivesRuntimeException() throws
URISyntaxException, IOException, ExecutionException, InterruptedException{
+ Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678);
+ List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+ JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2);
+ Dag.DagNode<JobExecutionPlan> dagNode = new
Dag.DagNode<>(jobExecutionPlan);
+ dagNodeList.add(dagNode);
+ Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+ SpecProducer<Spec> mockedSpecProducer =
mockSpecExecutor.getProducer().get();
+
Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class));
+ DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
+
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
+ Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+ Mockito.verify(mockedSpecProducer,
Mockito.times(1)).addSpec(Mockito.any(JobSpec.class));
+ Mockito.verify(dagManagementStateStore,
Mockito.times(1)).getDagManagerMetrics();
+ Mockito.verify(metrics,
Mockito.times(1)).incrementRunningJobMetrics(dagNode);
+ }
+
+ private List<JobExecutionPlan> getJobExecutionPlans() throws
URISyntaxException {
+ Config flowConfig1 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY,
"flowName1")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build();
+ Config flowConfig2 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY,
"flowName2")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build();
+ Config flowConfig3 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY,
"flowName3")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build();
+ List<Config> flowConfigs = Arrays.asList(flowConfig1, flowConfig2,
flowConfig3);
+
+ Config jobConfig1 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
"source:destination:edgeName1").build();
+ Config jobConfig2 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2")
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
"source:destination:edgeName2").build();
+ Config jobConfig3 =
ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
+ .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY,
"source:destination:edgeName3").build();
+ List<Config> jobConfigs = Arrays.asList(jobConfig1, jobConfig2,
jobConfig3);
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ Config jobConfig = jobConfigs.get(i);
+ FlowSpec flowSpec =
FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build();
+ if(i==2){
+ jobExecutionPlans.add(new
JobExecutionPlan.Factory().createPlan(flowSpec,
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
+ ConfigValueFactory.fromAnyRef("testUri")), mockSpecExecutor, 0L,
ConfigFactory.empty()));
+ }
+ else{
+ jobExecutionPlans.add(new
JobExecutionPlan.Factory().createPlan(flowSpec,
jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
Review Comment:
formatting again off here
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -196,9 +196,15 @@ public void dagProcessingTest()
10000L, "dagTaskStream was not called " + expectedNumOfInvocations + "
number of times. "
+ "Actual number of invocations " +
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(),
log, 1, 1000L);
-
+ // Currently we are treating all exceptions as non retryable and
totalExceptionCount will be equal to count of non retryable exceptions
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
expectedExceptions);
-
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(),
expectedNonRetryableExceptions);
+
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(),
expectedExceptions);
Review Comment:
shouldn't `expectedNonRetryableExceptions` on L193 be removed?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -149,6 +158,17 @@ public void run() {
dagTask.conclude();
log.info(dagProc.contextualizeStatus("concluded dagTask"));
} catch (Exception e) {
+ if(!DagProcessingEngine.isTransientException(e)) {
+ DagActionStore.DagAction dagAction = dagTask.getDagAction();
+ if(dagAction!=null) {
+ log.error(
+ "Ignoring non transient exception. DagTask with dagId: {}
and dagAction: {} will conclude and will not be retried.",
+ dagAction.getDagId(), dagAction.getDagActionType(), e);
+ }
Review Comment:
1. replace L162-167, w/ equivalent:
```
log.error(dagProc.contextualizeStatus("ignoring non-transient exception by
concluding so no retries"));
```
2. let's move general announcement of an the error before special handling
of non-transient, specifically:
```
log.error("DagProcEngineThread: " + dagProc.contextualizeStatus("error"), e);
```
3. in the above (2.) log it as `error` and keep the exception as the 2nd
arg, but the message about ignoring can probably just be `warn` and needn't
repeat the same stacktrace (so no exception as 2nd arg).
4 and although it may go away, pointing out that the way to format:
```
if(dagAction!=null) {
```
is w/ whitespace
```
if (dagAction != null) {
```
Issue Time Tracking
-------------------
Worklog Id: (was: 948182)
Time Spent: 1.5h (was: 1h 20m)
> Non transient exception handling by flowspec removal
> ----------------------------------------------------
>
> Key: GOBBLIN-2181
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2181
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Vaibhav Singhal
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> - Many times we experience failures in flow initialization or processing due
> to which flow could not be concluded properly
> - Azkaban client exceptions and SQLIntegrityViolation exceptions are
> examples which have caused failures in recent history
> - Currently most of these failures are by default considered transient
> exceptions and are retried infinitely
> - As a side effect, it causes flows not to conclude and causes failures in
> future flow submissions which have caused incidents recently
>
> - As a first step we want to consider all exceptions as non transient and
> not retry and remove conclude the flow by removing flowspec and dag action
> - This issue tracks the changes to conclude the flow for non transient
> exceptions and also mark them as failure to reflect the correct status of the
> flow
--
This message was sent by Atlassian Jira
(v8.20.10#820010)