[ 
https://issues.apache.org/jira/browse/GOBBLIN-2039?focusedWorklogId=914030&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-914030
 ]

ASF GitHub Bot logged work on GOBBLIN-2039:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Apr/24 20:01
            Start Date: 10/Apr/24 20:01
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3918:
URL: https://github.com/apache/gobblin/pull/3918#discussion_r1559987681


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -144,7 +144,12 @@ protected void initializeMonitor() {
     }
     // TODO: make this multi-threaded to add parallelism
     for (DagActionStore.DagAction action : dagActions) {
-      handleDagAction(action, true);
+      try {
+        handleDagAction(action, true);
+      } catch (Exception e) {
+        log.warn("Ran into an unexpected error processing DagActionStore 
changes during initialization for action {}", action, e);

Review Comment:
   `error` level?



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java:
##########
@@ -98,6 +118,16 @@ public void setup() {
      mockDagActionStoreChangeMonitor.startUpForTest();
   }
 
+  @BeforeClass
+  public void setupTestDb() throws Exception {
+    this.testDb = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass
+  public void cleanup() throws IOException {
+    this.testDb.close();

Review Comment:
   shall we name `setUp()` and `tearDown()` like the other test?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -136,7 +136,7 @@ public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Conf
         ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
String.valueOf(this.isFlowConcurrencyEnabled)));
 
     Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
-    if (jobExecutionPlanDag.isEmpty()) {
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {

Review Comment:
   was this NPE always possible, but unhandled, or arises now as a result of 
these changes?



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java:
##########
@@ -201,6 +231,40 @@ public void testProcessMessageWithDelete() throws 
SpecNotFoundException {
     verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), 
times(0)).getSpecs(any(URI.class));
   }
 
+  @Test
+  public void testStartupSequenceHandlesFailures() throws Exception {
+    Config config = ConfigBuilder.create()
+        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
+        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .build();
+    String flowGroup = "testFlowGroup";
+    String flowName = "testFlowName";
+    String jobName = "testJobName";
+    String flowExecutionId = "12345677";
+
+    MysqlDagActionStore mysqlDagActionStore = new MysqlDagActionStore(config);
+    mysqlDagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH);
+
+    Config monitorConfig = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
+        
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    DagManager mockDagManager = mock(DagManager.class);
+    FlowCatalog mockFlowCatalog = mock(FlowCatalog.class);
+    Orchestrator mockOrchestrator = mock(Orchestrator.class);
+    // Throw an uncaught exception during startup sequence
+    when(mockFlowCatalog.getSpecs(any(URI.class))).thenThrow(new 
RuntimeException("Uncaught exception"));

Review Comment:
   should we verify that this method was actually called?





Issue Time Tracking
-------------------

            Worklog Id:     (was: 914030)
    Remaining Estimate: 0h
            Time Spent: 10m

> Handle edge cases in compilation and startup of dag action change store 
> monitor
> -------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2039
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2039
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> DagActionChangeStoreMonitor will try to load all the dag actions in the mysql 
> db on startup and submit them to the dagmanager.
> Sometimes there are issues during this startup process that can cause errors, 
> we want to handle these issues gracefully and not block the dag action change 
> monitor from initialization, otherwise it will block any dags from being 
> processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to