[
https://issues.apache.org/jira/browse/GOBBLIN-2039?focusedWorklogId=914030&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-914030
]
ASF GitHub Bot logged work on GOBBLIN-2039:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 10/Apr/24 20:01
Start Date: 10/Apr/24 20:01
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3918:
URL: https://github.com/apache/gobblin/pull/3918#discussion_r1559987681
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -144,7 +144,12 @@ protected void initializeMonitor() {
}
// TODO: make this multi-threaded to add parallelism
for (DagActionStore.DagAction action : dagActions) {
- handleDagAction(action, true);
+ try {
+ handleDagAction(action, true);
+ } catch (Exception e) {
+ log.warn("Ran into an unexpected error processing DagActionStore
changes during initialization for action {}", action, e);
Review Comment:
`error` level?
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java:
##########
@@ -98,6 +118,16 @@ public void setup() {
mockDagActionStoreChangeMonitor.startUpForTest();
}
+ @BeforeClass
+ public void setupTestDb() throws Exception {
+ this.testDb = TestMetastoreDatabaseFactory.get();
+ }
+
+ @AfterClass
+ public void cleanup() throws IOException {
+ this.testDb.close();
Review Comment:
shall we name `setUp()` and `tearDown()` like the other test?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -136,7 +136,7 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
String.valueOf(this.isFlowConcurrencyEnabled)));
Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
- if (jobExecutionPlanDag.isEmpty()) {
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
Review Comment:
was this NPE always possible, but unhandled, or arises now as a result of
these changes?
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java:
##########
@@ -201,6 +231,40 @@ public void testProcessMessageWithDelete() throws
SpecNotFoundException {
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(),
times(0)).getSpecs(any(URI.class));
}
+ @Test
+ public void testStartupSequenceHandlesFailures() throws Exception {
+ Config config = ConfigBuilder.create()
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+ String flowGroup = "testFlowGroup";
+ String flowName = "testFlowName";
+ String jobName = "testJobName";
+ String flowExecutionId = "12345677";
+
+ MysqlDagActionStore mysqlDagActionStore = new MysqlDagActionStore(config);
+ mysqlDagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
+
+ Config monitorConfig =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
+
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ DagManager mockDagManager = mock(DagManager.class);
+ FlowCatalog mockFlowCatalog = mock(FlowCatalog.class);
+ Orchestrator mockOrchestrator = mock(Orchestrator.class);
+ // Throw an uncaught exception during startup sequence
+ when(mockFlowCatalog.getSpecs(any(URI.class))).thenThrow(new
RuntimeException("Uncaught exception"));
Review Comment:
should we verify that this method was actually called?
Issue Time Tracking
-------------------
Worklog Id: (was: 914030)
Remaining Estimate: 0h
Time Spent: 10m
> Handle edge cases in compilation and startup of dag action change store
> monitor
> -------------------------------------------------------------------------------
>
> Key: GOBBLIN-2039
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2039
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> DagActionChangeStoreMonitor will try to load all the dag actions in the mysql
> db on startup and submit them to the dagmanager.
> Sometimes there are issues during this startup process that can cause errors,
> we want to handle these issues gracefully and not block the dag action change
> monitor from initialization, otherwise it will block any dags from being
> processed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)