umustafi commented on code in PR #3973:
URL: https://github.com/apache/gobblin/pull/3973#discussion_r1640236504
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -224,31 +224,41 @@ protected void processMessage(DecodeableKafkaRecord
message) {
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
dagActionType);
+ handleDagAction(operation, dagAction, flowGroup, flowName,
flowExecutionId, dagActionType);
+
+ dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
+ }
+
+ protected void handleDagAction(String operation, DagActionStore.DagAction
dagAction, String flowGroup, String flowName,
+ long flowExecutionId, DagActionStore.DagActionType dagActionType) {
// We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
// {@link DagActionStore.FlowActionType} flow requests that have to be
processed. DELETEs require no action.
try {
- if (operation.equals("INSERT")) {
- handleDagAction(dagAction, false);
- } else if (operation.equals("UPDATE")) {
- // TODO: change this warning message and process updates if for launch
or reevaluate type
- log.warn("Received an UPDATE action to the DagActionStore when values
in this store are never supposed to be "
- + "updated. Flow group: {} name {} executionId {} were updated to
action {}", flowGroup, flowName,
- flowExecutionId, dagActionType);
- this.unexpectedErrors.mark();
- } else if (operation.equals("DELETE")) {
- log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
- } else {
- log.warn("Received unsupported change type of operation {}. Expected
values to be in [INSERT, UPDATE, DELETE]",
- operation);
- this.unexpectedErrors.mark();
- return;
+ switch (operation) {
Review Comment:
nice improvement
##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.quartz.SchedulerException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.DagActionValue;
+import
org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.OperationType;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests the main functionality of {@link
DagManagementDagActionStoreChangeMonitor} to process {@link
DagActionStoreChangeEvent} type
+ * events stored in a {@link
org.apache.gobblin.kafka.client.KafkaConsumerRecord}. The
+ * processMessage(DecodeableKafkaRecord message) function should be able to
gracefully process a variety of message
+ * types, even with undesired formats, without throwing exceptions.
+ */
+@Slf4j
+public class DagManagementDagActionStoreChangeMonitorTest {
+ public static final String TOPIC =
DagActionStoreChangeEvent.class.getSimpleName();
+ private final int PARTITION = 1;
+ private final int OFFSET = 1;
+ private final String FLOW_GROUP = "flowGroup";
+ private final String FLOW_NAME = "flowName";
+ private final long FLOW_EXECUTION_ID = 123L;
+ private final String JOB_NAME = "jobName";
+ private MockDagManagementDagActionStoreChangeMonitor
mockDagManagementDagActionStoreChangeMonitor;
+ private int txidCounter = 0;
+
+ private static final DagActionReminderScheduler dagActionReminderScheduler =
mock(DagActionReminderScheduler.class);
+
+ /**
+ * Note: The class methods are wrapped in a test specific method because the
original methods are package protected
+ * and cannot be accessed by this class.
+ */
+ static class MockDagManagementDagActionStoreChangeMonitor extends
DagManagementDagActionStoreChangeMonitor {
+
+ public MockDagManagementDagActionStoreChangeMonitor(Config config, int
numThreads, boolean isMultiActiveSchedulerEnabled) {
+ super(config, numThreads, mock(FlowCatalog.class),
mock(Orchestrator.class), mock(DagManagementStateStore.class),
+ isMultiActiveSchedulerEnabled, mock(DagManagement.class),
dagActionReminderScheduler);
+ }
+ protected void processMessageForTest(DecodeableKafkaRecord<String,
DagActionStoreChangeEvent> record) {
+ super.processMessage(record);
+ }
+ }
+
+ MockDagManagementDagActionStoreChangeMonitor
createMockDagManagementDagActionStoreChangeMonitor() {
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
+
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ return new MockDagManagementDagActionStoreChangeMonitor(config, 5, true);
+ }
+
+ // Called at start of every test so the count of each method being called is
reset to 0
+ @BeforeMethod
+ public void setupMockMonitor() {
+ mockDagManagementDagActionStoreChangeMonitor =
createMockDagManagementDagActionStoreChangeMonitor();
+ }
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ doNothing().when(dagActionReminderScheduler).unscheduleReminderJob(any());
+
+ }
+
+ /**
+ * Tests process message with a DELETE type message which should be ignored
regardless of the flow information.
Review Comment:
is this desc accurate? the delete is not ignored anymore
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -37,16 +40,56 @@
@Slf4j
public class DagManagementDagActionStoreChangeMonitor extends
DagActionStoreChangeMonitor {
private final DagManagement dagManagement;
+ @VisibleForTesting @Getter
+ private final DagActionReminderScheduler dagActionReminderScheduler;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
public DagManagementDagActionStoreChangeMonitor(Config config, int
numThreads,
FlowCatalog flowCatalog, Orchestrator orchestrator,
DagManagementStateStore dagManagementStateStore,
- boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
+ boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement,
DagActionReminderScheduler dagActionReminderScheduler) {
Review Comment:
do u need to make sure initialization order in guide works here?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java:
##########
@@ -93,25 +81,26 @@ public void resumeDag()
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.FAILED);
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(4).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
-
doReturn(Optional.of(dag)).when(dagManagementStateStore).getFailedDag(any());
+ this.dagManagementStateStore.checkpointDag(dag);
+ // simulate it as a failed dag
+ this.dagManagementStateStore.markDagFailed(dag);
ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.RESUME),
null, this.dagManagementStateStore));
resumeDagProc.process(this.dagManagementStateStore);
- SpecProducer<Spec> specProducer =
DagManagerUtils.getSpecProducer(dag.getNodes().get(1));
- List<SpecProducer<Spec>> otherSpecProducers =
dag.getNodes().stream().map(node -> {
- try {
- return DagManagerUtils.getSpecProducer(node);
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }).filter(sp -> specProducer != sp).collect(Collectors.toList());
int expectedNumOfResumedJobs = 1; // = number of resumed nodes
- Mockito.verify(specProducer,
Mockito.times(expectedNumOfResumedJobs)).addSpec(any());
+ // only the current job should have run
+ // we cannot check the spec producers if addSpec is called on them,
because in resumeDagProc, a dag is stored in DMSS
+ // and retrieved, hence it goes through serialization/deserialization; in
this process the SpecProducer objects in
Review Comment:
also change to use multi line comment
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -228,12 +225,12 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
this.eventProducer.emitObservabilityEvent(jobStatus);
}
- if (this.dagProcEngineEnabled) {
+ if (this.dagProcEngineEnabled && isJobLevelStatus(jobName)) {
Review Comment:
do u have test for isJobLevelStatus or this monitoring method?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -180,4 +175,29 @@ public static Dag<JobExecutionPlan>
buildDagWithMultipleNodesAtDifferentLevels(S
}
return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
}
+
+ public static void mockDMSSCommonBehavior(DagManagementStateStore
dagManagementStateStore) throws IOException, SpecNotFoundException {
+
doReturn(FlowSpec.builder().build()).when(dagManagementStateStore).getFlowSpec(any());
+ doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+ doReturn(true).when(dagManagementStateStore).releaseQuota(any());
+ }
+
+ public static TopologySpec buildNaiveTopologySpec(String specUriInString) {
+ String specStoreDir = "/tmp/specStoreDir";
Review Comment:
add comments
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java:
##########
@@ -93,25 +81,26 @@ public void resumeDag()
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.FAILED);
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(4).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
-
doReturn(Optional.of(dag)).when(dagManagementStateStore).getFailedDag(any());
+ this.dagManagementStateStore.checkpointDag(dag);
+ // simulate it as a failed dag
+ this.dagManagementStateStore.markDagFailed(dag);
ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT,
DagActionStore.DagActionType.RESUME),
null, this.dagManagementStateStore));
resumeDagProc.process(this.dagManagementStateStore);
- SpecProducer<Spec> specProducer =
DagManagerUtils.getSpecProducer(dag.getNodes().get(1));
- List<SpecProducer<Spec>> otherSpecProducers =
dag.getNodes().stream().map(node -> {
- try {
- return DagManagerUtils.getSpecProducer(node);
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }).filter(sp -> specProducer != sp).collect(Collectors.toList());
int expectedNumOfResumedJobs = 1; // = number of resumed nodes
- Mockito.verify(specProducer,
Mockito.times(expectedNumOfResumedJobs)).addSpec(any());
+ // only the current job should have run
+ // we cannot check the spec producers if addSpec is called on them,
because in resumeDagProc, a dag is stored in DMSS
+ // and retrieved, hence it goes through serialization/deserialization; in
this process the SpecProducer objects in
Review Comment:
nit: extra space before "the specProducer"
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -37,16 +40,56 @@
@Slf4j
public class DagManagementDagActionStoreChangeMonitor extends
DagActionStoreChangeMonitor {
private final DagManagement dagManagement;
+ @VisibleForTesting @Getter
+ private final DagActionReminderScheduler dagActionReminderScheduler;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
public DagManagementDagActionStoreChangeMonitor(Config config, int
numThreads,
FlowCatalog flowCatalog, Orchestrator orchestrator,
DagManagementStateStore dagManagementStateStore,
- boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
+ boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement,
DagActionReminderScheduler dagActionReminderScheduler) {
// DagManager is only needed in the `handleDagAction` method of its parent
class and not needed in this class,
// so we are passing a null value for DagManager to its parent class.
super("", config, null, numThreads, flowCatalog, orchestrator,
dagManagementStateStore, isMultiActiveSchedulerEnabled);
this.dagManagement = dagManagement;
+ this.dagActionReminderScheduler = dagActionReminderScheduler;
+ }
+
+ @Override
+ protected void handleDagAction(String operation, DagActionStore.DagAction
dagAction, String flowGroup, String flowName,
+ long flowExecutionId, DagActionStore.DagActionType dagActionType) {
+ // We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
+ // {@link DagActionStore.FlowActionType} flow requests that have to be
processed.
+ try {
+ switch (operation) {
+ case "INSERT":
+ handleDagAction(dagAction, false);
+ break;
+ case "UPDATE":
+ // TODO: change this warning message and process updates if for
launch or reevaluate type
+ log.warn("Received an UPDATE action to the DagActionStore when
values in this store are never supposed to be "
Review Comment:
this is not true any longer like ur saying so we should update this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]