umustafi commented on code in PR #3950:
URL: https://github.com/apache/gobblin/pull/3950#discussion_r1610441614


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -135,6 +151,37 @@ public DagTask next() {
       }
   }
 
+  private void createJobStartDeadlineTrigger(DagActionStore.DagAction 
dagAction) throws SchedulerException, IOException {
+    long timeOutForJobStart = 
DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
+        dagAction.getDagId()).get().getNodes().get(0), 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+    // todo - this timestamp is just an approximation, the real job submission 
has happened in past, and that is when a

Review Comment:
   is this approximation okay for us or you plan on updating this in the future 
to use the actual SLA deadline from when job submission occurred? 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation for {@link DagProc} that kills all the jobs if the dag 
does not finish in
+ * {@link 
org.apache.gobblin.configuration.ConfigurationKeys#GOBBLIN_FLOW_SLA_TIME} time.
+ */
+@Slf4j
+public class EnforceFlowFinishDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask 
enforceFlowFinishDeadlineDagTask) {
+    super(enforceFlowFinishDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceFlowFinishDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceFlowFinishDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+    long flowSla = DagManagerUtils.getFlowSLA(dagNode);
+    long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+
+    // note that this condition should be true because the triggered dag 
action has waited enough before reaching here
+    if (System.currentTimeMillis() > flowStartTime + flowSla) {
+      log.info("Dag {} exceeded the SLA of {} ms. Killing it now...", 
getDagId(), flowSla);
+      List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = 
dag.get().getNodes();
+      log.info("Found {} DagNodes to cancel (DagId {}).", 
dagNodesToCancel.size(), getDagId());

Review Comment:
   can we consolidate these logs into one so easier to track. Let's also make 
the message something easily searchable
   
   `Dag exceeded Flow Finish SLA. Killing all jobs associated with it. Dag: {} 
SLA {} Num nodes to kill {} DagID: {}` then the info is all in one log that's 
easier to search.



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+    this.testMetastoreDatabase.close();
+    this.mockedGobblinServiceManager.close();
+  }
+
+  /*
+    This test simulate submitting a dag with a very little job start deadline 
and then verifies that the starting job is
+     killed because not being able to start in the deadline time.
+   */
+  @Test
+  public void enforceJobStartDeadlineTest() throws Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        5, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L)));
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
+
+    EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
+        new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, mock(DagActionStore.class)));
+    enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+
+    int expectedNumOfDeleteDagNodeStates = 1; // the one dag node 
corresponding to the EnforceStartDeadlineDagProc
+    Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());
+  }
+
+  /*
+   This test simulate submitting a dag with a very little flow finish deadline 
and then verifies that all of its job are

Review Comment:
   see above



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -124,9 +131,18 @@ public DagTask next() {
       while (true) {
         try {
           DagActionStore.DagAction dagAction = this.dagActionQueue.take();
-          LeaseAttemptStatus leaseAttemptStatus = 
retrieveLeaseStatus(dagAction);
-          if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus) {
-            return createDagTask(dagAction, 
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
+          // create triggers for original (non-reminder) dag actions of type 
ENFORCE_JOB_START_DEADLINE and ENFORCE_FLOW_FINISH_DEADLINE
+          // reminder triggers are used to inform hosts once the deadline for 
ENFORCE_JOB_START and ENFORCE_FLOW_FINISH passed

Review Comment:
   missing "_DEADLINE" also nit: let's use a multi-line comment /*....*/



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -86,18 +88,18 @@ public static class ReminderJob implements Job {
     @Override
     public void execute(JobExecutionContext context) {
       // Get properties from the trigger to create a dagAction
-      JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+      JobDataMap jobDataMap = context.getMergedJobDataMap();
       String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
       String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
-      String flowId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      String flowExecutionId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
       DagActionStore.DagActionType dagActionType = 
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
 
       log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
-          + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+          + ", flowExecutionId: " + flowExecutionId + ", jobName: " + jobName 
+ ", dagActionType: " + dagActionType + ")");
 
-      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
-          dagActionType);
+      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, 
dagActionType);
+      //dagAction.setReminder(true);

Review Comment:
   actually we have a better method:
   pass a flag to `createReminderJobTrigger` when the reminder is created to 
denote whether it's actually a reminder or not so for deadline triggers the 
first time we are created we don't set this value then for any subsequent 
attempt we set reminder=True. when we create reminder for all other actions 
types we set reminder=True



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -86,18 +88,18 @@ public static class ReminderJob implements Job {
     @Override
     public void execute(JobExecutionContext context) {
       // Get properties from the trigger to create a dagAction
-      JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+      JobDataMap jobDataMap = context.getMergedJobDataMap();
       String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
       String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
-      String flowId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      String flowExecutionId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
       DagActionStore.DagActionType dagActionType = 
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
 
       log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
-          + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+          + ", flowExecutionId: " + flowExecutionId + ", jobName: " + jobName 
+ ", dagActionType: " + dagActionType + ")");
 
-      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
-          dagActionType);
+      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, 
dagActionType);
+      //dagAction.setReminder(true);

Review Comment:
   do we not always want to set the reminder flag for actions coming from the 
trigger (potentially other than the deadline actions depending on how we decide 
to implement)
   
   option 1: set reminder flag for all events
   - handle deadline events knowing when it's a reminder = True is when we 
enforce
   
   option 2: set reminder flag for all events except deadline if there's some 
other job property such as "firstTimeFiring=True" 
   - set firstTimeFiring=False so all subsequent deadline actions are known to 
be true "reminders"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation for {@link DagProc} that kills all the jobs if the dag 
does not finish in
+ * {@link 
org.apache.gobblin.configuration.ConfigurationKeys#GOBBLIN_FLOW_SLA_TIME} time.
+ */
+@Slf4j
+public class EnforceFlowFinishDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask 
enforceFlowFinishDeadlineDagTask) {
+    super(enforceFlowFinishDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceFlowFinishDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceFlowFinishDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+    long flowSla = DagManagerUtils.getFlowSLA(dagNode);
+    long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+
+    // note that this condition should be true because the triggered dag 
action has waited enough before reaching here
+    if (System.currentTimeMillis() > flowStartTime + flowSla) {

Review Comment:
   let's add a warn/error log and/or metric (can be TODO) for when this is not 
true 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -191,4 +196,16 @@ private void submitNextNodes(DagManagementStateStore 
dagManagementStateStore, Da
   private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> 
nextNodes) {
     throw new UnsupportedOperationException("More than one start job is not 
allowed");
   }
+
+  private void removeFlowFinishDeadlineTriggerAndDagAction() {
+    DagActionStore.DagAction enforceFlowFinishDeadlineDagAction = 
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),

Review Comment:
   let's log when this function is used in case it has unintended consequences



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed 
and cancel the job if it does not start in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
time.
+ */
+@Slf4j
+public class EnforceJobStartDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
+    super(enforceJobStartDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceJobStartDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceJobStartDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+        dagNodeToCheckDeadline = 
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+    if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+      // this should never happen; a job for which DEADLINE_ENFORCEMENT dag 
action is created must have a dag node in store
+      log.error("Dag node {} not found for EnforceJobStartDeadlineDagProc", 
getDagNodeId());
+      return;
+    }
+
+    Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeToCheckDeadline.getLeft().get();
+    long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode, 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+    Optional<org.apache.gobblin.service.monitoring.JobStatus> jobStatus = 
dagNodeToCheckDeadline.getRight();
+    if (!jobStatus.isPresent()) {
+      return;
+    }
+
+    ExecutionStatus executionStatus = valueOf(jobStatus.get().getEventName());
+    long jobOrchestratedTime = jobStatus.get().getOrchestratedTime();
+    // note that second condition should be true because the triggered dag 
action has waited enough before reaching here
+    if (executionStatus == ORCHESTRATED && System.currentTimeMillis() > 
jobOrchestratedTime + timeOutForJobStart) {
+      log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing 
the job now...",

Review Comment:
   I suggest a similar change in log message to put a cohesive message 
beforehand that's easily searchable then all the parameters at the end



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed 
and cancel the job if it does not start in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
time.
+ */
+@Slf4j
+public class EnforceJobStartDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
+    super(enforceJobStartDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceJobStartDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceJobStartDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+        dagNodeToCheckDeadline = 
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+    if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+      // this should never happen; a job for which DEADLINE_ENFORCEMENT dag 
action is created must have a dag node in store

Review Comment:
   add TODO to include a metric here that we will alert on 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -246,6 +256,19 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     }
   }
 
+  private void clearStartDeadlineTriggerAndDagAction(String flowGroup, String 
flowName, String flowExecutionId, String jobName) {
+

Review Comment:
   same as above let's log when this is used and javadoc for when it should be 
called



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+
+
+/**
+ * A {@link DagTask} responsible for killing jobs if they have not started in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
or does not finish in

Review Comment:
   is this not only for the latter?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+
+
+/**
+ * A {@link DagTask} responsible for killing jobs if they have not started in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
or does not finish in

Review Comment:
   only the former right



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+    this.testMetastoreDatabase.close();
+    this.mockedGobblinServiceManager.close();
+  }
+
+  /*
+    This test simulate submitting a dag with a very little job start deadline 
and then verifies that the starting job is
+     killed because not being able to start in the deadline time.
+   */
+  @Test
+  public void enforceJobStartDeadlineTest() throws Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        5, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L)));
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
+
+    EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new 
EnforceJobStartDeadlineDagProc(
+        new EnforceJobStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+            "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), 
null, mock(DagActionStore.class)));
+    enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+
+    int expectedNumOfDeleteDagNodeStates = 1; // the one dag node 
corresponding to the EnforceStartDeadlineDagProc
+    Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());

Review Comment:
   do both deadlines result in same method being called?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed 
and cancel the job if it does not start in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
time.
+ */
+@Slf4j
+public class EnforceJobStartDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask 
enforceJobStartDeadlineDagTask) {
+    super(enforceJobStartDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceJobStartDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceJobStartDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+        dagNodeToCheckDeadline = 
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+    if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+      // this should never happen; a job for which DEADLINE_ENFORCEMENT dag 
action is created must have a dag node in store
+      log.error("Dag node {} not found for EnforceJobStartDeadlineDagProc", 
getDagNodeId());
+      return;
+    }
+
+    Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeToCheckDeadline.getLeft().get();
+    long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode, 
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+    Optional<org.apache.gobblin.service.monitoring.JobStatus> jobStatus = 
dagNodeToCheckDeadline.getRight();
+    if (!jobStatus.isPresent()) {

Review Comment:
   does this mean the job finished? if so let's comment



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+    this.testMetastoreDatabase.close();
+    this.mockedGobblinServiceManager.close();
+  }
+
+  /*
+    This test simulate submitting a dag with a very little job start deadline 
and then verifies that the starting job is

Review Comment:
   "with very short job start deadline that will definitely be breached, 
resulting in the job requiring to be killed..." (or something to that effect



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java:
##########
@@ -134,6 +146,12 @@ public void testOneNextJobToRun() throws Exception {
     // current job's state is deleted
     
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
+
+    
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
+        .filter(a -> 
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
+
+    
Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream()
+        .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);

Review Comment:
   why are these called if the reminder fired and action was taken? I would 
expect deleteDagAction because it means the action has been completed 
(committed)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -246,6 +256,19 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     }
   }
 
+  private void clearStartDeadlineTriggerAndDagAction(String flowGroup, String 
flowName, String flowExecutionId, String jobName) {
+

Review Comment:
   also put todo for a metric



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to