umustafi commented on code in PR #3950:
URL: https://github.com/apache/gobblin/pull/3950#discussion_r1605444064


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java:
##########
@@ -30,12 +30,14 @@
 public interface DagActionStore {
   public static final String NO_JOB_NAME_DEFAULT = "";
   enum DagActionType {
+    CANCEL, // Invoked through DagManager if flow has been stuck in 
Orchestrated state for a while
     KILL, // Kill invoked through API call
-    RESUME, // Resume flow invoked through API call
     LAUNCH, // Launch new flow execution invoked adhoc or through scheduled 
trigger
+    REEVALUATE, // Re-evaluate what needs to be done upon receipt of a final 
job status
+    RESUME, // Resume flow invoked through API call
     RETRY, // Invoked through DagManager for flows configured to allow retries
-    CANCEL, // Invoked through DagManager if flow has been stuck in 
Orchestrated state for a while
-    REEVALUATE // Re-evaluate what needs to be done upon receipt of a final 
job status
+    ENFORCE_START_DEADLINE, // Enforce job start deadline
+    ENFORCE_FINISH_DEADLINE, // Enforce job finish deadline

Review Comment:
   are both job level or the finish is flow level?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java:
##########
@@ -69,6 +72,10 @@ public DagNodeId getDagNodeId() {
       return new DagNodeId(this.flowGroup, this.flowName,
           Long.parseLong(this.flowExecutionId), this.flowGroup, this.jobName);
     }
+
+    public void setReminder(boolean isReminder) {

Review Comment:
   replace with @setter Lombok annotation



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -246,6 +256,19 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     }
   }
 
+  private void clearStartDeadlineTriggerAndDagAction(String flowGroup, String 
flowName, String flowExecutionId, String jobName) {
+
+    DagActionStore.DagAction enforceStartDeadlineDagAction = new 
DagActionStore.DagAction(flowGroup, flowName,
+        String.valueOf(flowExecutionId), jobName, 
DagActionStore.DagActionType.ENFORCE_START_DEADLINE);
+    try {
+      
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(enforceStartDeadlineDagAction);
+      
GobblinServiceManager.getClass(DagActionStore.class).deleteDagAction(enforceStartDeadlineDagAction);
+    } catch (SchedulerException | IOException e) {
+      log.warn("Failed to unschedule the reminder for {}", 
enforceStartDeadlineDagAction);

Review Comment:
   error level logging maybe. although its possible the scheduler reminder 
fired and was deleted so that can be warn while the other is error.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -191,4 +196,16 @@ private void submitNextNodes(DagManagementStateStore 
dagManagementStateStore, Da
   private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> 
nextNodes) {
     throw new UnsupportedOperationException("More than one start job is not 
allowed");
   }
+
+  private void clearFinishDeadlineTriggerAndDagAction() {

Review Comment:
   nit: rename to "removeFinish..."



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+    this.testMetastoreDatabase.close();
+    this.mockedGobblinServiceManager.close();
+  }
+
+  @Test
+  public void enforceStartDeadlineTest() throws Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        5, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L)));
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
+
+    EnforceStartDeadlineDagProc enforceStartDeadlineDagProc = new 
EnforceStartDeadlineDagProc(
+        new EnforceStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+            "job0", DagActionStore.DagActionType.ENFORCE_START_DEADLINE), 
null, mock(DagActionStore.class)));
+    enforceStartDeadlineDagProc.process(dagManagementStateStore);
+
+    int expectedNumOfDeleteDagNodeStates = 1; // the one dag node 
corresponding to the EnforceStartDeadlineDagProc
+    Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());
+  }
+
+  @Test
+  public void enforceFinishDeadlineTest() throws Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    int numOfDagNodes = 5;
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        numOfDagNodes, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L)));

Review Comment:
   common code in two tests can we make class var



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -111,4 +122,52 @@ public static void 
submitJobToExecutor(DagManagementStateStore dagManagementStat
       throw new RuntimeException(e);
     }
   }
+
+  public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws 
IOException {
+    Properties props = new Properties();
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+    if 
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
+      props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+          
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+    }
+
+    try {
+      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+        Future future = dagNodeToCancel.getValue().getJobFuture().get();
+        String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+        props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
+        sendCancellationEvent(dagNodeToCancel.getValue());
+      } else {
+        log.warn("No Job future when canceling DAG node (hence, not sending 
cancellation event) - {}",
+            dagNodeToCancel.getValue().getJobSpec().getUri());
+      }
+      
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 props).get();
+      // todo - why was it not being cleaned up in DagManager?
+      dagManagementStateStore.deleteDagNodeState(dagId, dagNodeToCancel);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public static void cancelDag(Dag<JobExecutionPlan> dag, 
DagManagementStateStore dagManagementStateStore) throws IOException {
+    List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
+    log.info("Found {} DagNodes to cancel (DagId {}).", 
dagNodesToCancel.size(), DagManagerUtils.generateDagId(dag));
+
+    for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+      DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
+    }
+  }
+
+  public static void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+    Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+    
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+    jobExecutionPlan.setExecutionStatus(CANCELLED);
+  }
+
+  private static void 
sendEnforceStartDeadlineDagAction(Dag.DagNode<JobExecutionPlan> dagNode)

Review Comment:
   do we not have a method for enforceJobFinishDeadlineDagAction?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+    this.testMetastoreDatabase.close();
+    this.mockedGobblinServiceManager.close();
+  }
+
+  @Test
+  public void enforceStartDeadlineTest() throws Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        5, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L)));
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
+
+    EnforceStartDeadlineDagProc enforceStartDeadlineDagProc = new 
EnforceStartDeadlineDagProc(
+        new EnforceStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+            "job0", DagActionStore.DagActionType.ENFORCE_START_DEADLINE), 
null, mock(DagActionStore.class)));
+    enforceStartDeadlineDagProc.process(dagManagementStateStore);
+
+    int expectedNumOfDeleteDagNodeStates = 1; // the one dag node 
corresponding to the EnforceStartDeadlineDagProc
+    Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());
+  }
+
+  @Test
+  public void enforceFinishDeadlineTest() throws Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());

Review Comment:
   can we move this to setup?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -135,6 +153,46 @@ public DagTask next() {
       }
   }
 
+  private void createStartDeadlineTrigger(DagActionStore.DagAction dagAction)
+      throws SchedulerException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
dagNodeToCreateTriggerFor =
+        
this.dagManagementStateStore.getDagNodeWithJobStatus(dagAction.getDagNodeId());
+
+    TimeUnit jobStartTimeUnit = 
TimeUnit.valueOf(ConfigUtils.getString(this.config, 
DagManager.JOB_START_SLA_UNITS,
+        ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+    long defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(this.config,
+        DagManager.JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+
+    long timeOutForJobStart = 
DagManagerUtils.getJobStartSla(dagNodeToCreateTriggerFor.getLeft().get(), 
defaultJobStartSlaTimeMillis);
+    long jobOrchestratedTime = 
dagNodeToCreateTriggerFor.getRight().get().getOrchestratedTime();
+    long reminderDuration = jobOrchestratedTime + timeOutForJobStart - 
System.currentTimeMillis();

Review Comment:
   when this fires is that when it should have completed by? let's think about 
when we should calculate this time and who should store it? dagAction and/or 
task and/or reminder



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -135,6 +153,46 @@ public DagTask next() {
       }
   }
 
+  private void createStartDeadlineTrigger(DagActionStore.DagAction dagAction)
+      throws SchedulerException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
dagNodeToCreateTriggerFor =
+        
this.dagManagementStateStore.getDagNodeWithJobStatus(dagAction.getDagNodeId());
+
+    TimeUnit jobStartTimeUnit = 
TimeUnit.valueOf(ConfigUtils.getString(this.config, 
DagManager.JOB_START_SLA_UNITS,
+        ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+    long defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(this.config,
+        DagManager.JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+
+    long timeOutForJobStart = 
DagManagerUtils.getJobStartSla(dagNodeToCreateTriggerFor.getLeft().get(), 
defaultJobStartSlaTimeMillis);

Review Comment:
   what does timeout mean the time it should have started by?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceStartDeadlineDagProc.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed 
and cancel the job if it does not start in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
time.
+ */
+@Slf4j
+public class EnforceStartDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceStartDeadlineDagProc(EnforceStartDeadlineDagTask 
enforceStartDeadlineDagTask) {
+    super(enforceStartDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceStartDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceStartDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+        dagNodeToCheckDeadline = 
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+    if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+      // this should never happen; a job for which DEADLINE_ENFORCEMENT dag 
action is created must have a dag node in store

Review Comment:
   log an error here then



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -96,8 +98,8 @@ public void execute(JobExecutionContext context) {
       log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
           + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
 
-      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
-          dagActionType);
+      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName, dagActionType);
+      dagAction.setReminder(true);

Review Comment:
   why not set this in the line above?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -124,9 +135,16 @@ public DagTask next() {
       while (true) {
         try {
           DagActionStore.DagAction dagAction = this.dagActionQueue.take();
-          LeaseAttemptStatus leaseAttemptStatus = 
retrieveLeaseStatus(dagAction);
-          if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus) {
-            return createDagTask(dagAction, 
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
+          // create triggers for original (non-reminder) dag actions of type 
ENFORCE_START_DEADLINE and ENFORCE_FINISH_DEADLINE

Review Comment:
   comment logic of why we don't get lease for these deadlines after getting 
from queue rather at the time of enforcement so easy to understand



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -111,4 +122,52 @@ public static void 
submitJobToExecutor(DagManagementStateStore dagManagementStat
       throw new RuntimeException(e);
     }
   }
+
+  public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws 
IOException {
+    Properties props = new Properties();
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+    if 
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
+      props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+          
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+    }
+
+    try {
+      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+        Future future = dagNodeToCancel.getValue().getJobFuture().get();
+        String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+        props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
+        sendCancellationEvent(dagNodeToCancel.getValue());
+      } else {
+        log.warn("No Job future when canceling DAG node (hence, not sending 
cancellation event) - {}",
+            dagNodeToCancel.getValue().getJobSpec().getUri());

Review Comment:
   should we emit a metric here? is this a valid case or should it be an error 
let's comment more info here



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+    this.testMetastoreDatabase.close();
+    this.mockedGobblinServiceManager.close();
+  }
+
+  @Test
+  public void enforceStartDeadlineTest() throws Exception {

Review Comment:
   javadocs to describe what's happening in these tests



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -135,6 +153,46 @@ public DagTask next() {
       }
   }
 
+  private void createStartDeadlineTrigger(DagActionStore.DagAction dagAction)
+      throws SchedulerException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>> 
dagNodeToCreateTriggerFor =
+        
this.dagManagementStateStore.getDagNodeWithJobStatus(dagAction.getDagNodeId());
+
+    TimeUnit jobStartTimeUnit = 
TimeUnit.valueOf(ConfigUtils.getString(this.config, 
DagManager.JOB_START_SLA_UNITS,
+        ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+    long defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(this.config,
+        DagManager.JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+
+    long timeOutForJobStart = 
DagManagerUtils.getJobStartSla(dagNodeToCreateTriggerFor.getLeft().get(), 
defaultJobStartSlaTimeMillis);
+    long jobOrchestratedTime = 
dagNodeToCreateTriggerFor.getRight().get().getOrchestratedTime();
+    long reminderDuration = jobOrchestratedTime + timeOutForJobStart - 
System.currentTimeMillis();

Review Comment:
   it feels like dagTask and proc should have an epoch that will contain this 
information. we can save it in the reminder jobDataMap if you overload the 
method to schedule the reminder



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceFinishDeadlineDagTask;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private final MockedStatic<GobblinServiceManager> 
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    // `.close()` to avoid (in the aggregate, across multiple suites) - 
java.sql.SQLNonTransientConnectionException: Too many connections
+    this.testMetastoreDatabase.close();
+    this.mockedGobblinServiceManager.close();
+  }
+
+  @Test
+  public void enforceStartDeadlineTest() throws Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = 
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        5, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME, 
ConfigValueFactory.fromAnyRef(1L)));
+    JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+        message("Test 
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+    doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+    doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), 
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+    this.mockedGobblinServiceManager.when(() -> 
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+    dagManagementStateStore.checkpointDag(dag);  // simulate having a dag that 
has not yet started running
+
+    EnforceStartDeadlineDagProc enforceStartDeadlineDagProc = new 
EnforceStartDeadlineDagProc(
+        new EnforceStartDeadlineDagTask(new 
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+            "job0", DagActionStore.DagActionType.ENFORCE_START_DEADLINE), 
null, mock(DagActionStore.class)));
+    enforceStartDeadlineDagProc.process(dagManagementStateStore);
+
+    int expectedNumOfDeleteDagNodeStates = 1; // the one dag node 
corresponding to the EnforceStartDeadlineDagProc
+    Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+        
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+            .filter(a -> 
a.getMethod().getName().equals("deleteDagNodeState")).count());
+  }
+
+  @Test
+  public void enforceFinishDeadlineTest() throws Exception {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = System.currentTimeMillis();

Review Comment:
   global variables?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceStartDeadlineDagProc.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import 
org.apache.gobblin.service.modules.orchestration.task.EnforceStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed 
and cancel the job if it does not start in
+ * {@link 
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME} 
time.
+ */
+@Slf4j
+public class EnforceStartDeadlineDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+  public EnforceStartDeadlineDagProc(EnforceStartDeadlineDagTask 
enforceStartDeadlineDagTask) {
+    super(enforceStartDeadlineDagTask);
+  }
+
+  @Override
+  protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+   return dagManagementStateStore.getDag(getDagId());
+  }
+
+  @Override
+  protected void act(DagManagementStateStore dagManagementStateStore, 
Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    log.info("Request to enforce deadlines for dag {}", getDagId());
+
+    if (!dag.isPresent()) {
+      // todo - add a metric here
+      log.error("Did not find Dag with id {}, it might be already 
cancelled/finished and thus cleaned up from the store.",
+          getDagId());
+      return;
+    }
+
+    enforceStartDeadline(dagManagementStateStore, dag);
+  }
+
+  private void enforceStartDeadline(DagManagementStateStore 
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+      throws IOException {
+    Pair<Optional<Dag.DagNode<JobExecutionPlan>>, 
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+        dagNodeToCheckDeadline = 
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+    if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+      // this should never happen; a job for which DEADLINE_ENFORCEMENT dag 
action is created must have a dag node in store
+      return;
+    }
+
+    Dag.DagNode<JobExecutionPlan> dagNode = 
dagNodeToCheckDeadline.getLeft().get();
+    long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode, 
1000000L);//, this.defaultJobStartSlaTimeMillis);

Review Comment:
   replace with default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to