umustafi commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1343192925


##########
gobblin-rest-service/gobblin-rest-api/src/main/snapshot/org.apache.gobblin.rest.jobExecutions.snapshot.json:
##########


Review Comment:
   Is this meant to be a part of ur change? was this done or included by 
mistake?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+  public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode);
+
+  public boolean hasRunningJobs(String dagId);
+
+  public void removeDagActionFromStore(DagManager.DagId dagId, 
DagActionStore.FlowActionType flowActionType) throws IOException;
+
+  public void addDagSLA(String dagId, Long flowSla);
+
+  public Long getDagSLA(String dagId);

Review Comment:
   are these dagStart or dagKill Deadlines? We have both configurable so may 
want to create separate functions for each. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+  private static final long DEFAULT_RETRY_DELAY_MS = 1000;

Review Comment:
   `millis` to differentiate from `micros`, also add the time unit to polling 
interval



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java:
##########
@@ -171,6 +176,12 @@ public void configure(Binder binder) {
     if (serviceConfig.isMultiActiveSchedulerEnabled()) {
       
binder.bind(MultiActiveLeaseArbiter.class).to(MysqlMultiActiveLeaseArbiter.class);
       binder.bind(FlowTriggerHandler.class);
+      if(serviceConfig.isDagProcessingEngineEnabled()) {
+        
binder.bind(DagManagementStateStore.class).to(InMemoryDagManagementStateStore.class);
+        binder.bind(DagProcFactory.class).in(Singleton.class);
+        binder.bind(DagProcessingEngine.class).in(Singleton.class);
+        binder.bind(DagTaskStream.class).in(Singleton.class);

Review Comment:
   I believe you need to create an `OptionalBinder` like I've done above 
`OptionalBinder.newOptionalBinder(binder,MultiActiveLeaseArbiter.class);` as 
each of these classes are instantiated only in certain cases. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }

Review Comment:
   let's move this check to above conditional 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently resume flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) throws IOException {
+     if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will enforce on cancelling the job when SLA is 
reached.
+   * It will return true and cancellation would be initiated via {@link 
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+   * by creating {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype: 
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
+   * The Flow SLA will be set when the {@link Dag} is launched either via 
Scheduler or REST client
+   * @param node dag node of the job
+   * @return true if the job reached sla needs to be cancelled
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+

Review Comment:
   extra newline. is it launched any other way?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");

Review Comment:
   let's update this message to be more specific. `Received flowActionType that 
is not handled by createDagTask`. Instead of return null we should either 
return an `optional` or throw an exception and catch the exception/check for 
`optional.isPresent`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from 
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate 
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class InMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new 
HashMap<>();
+  private final Set<String> failedDagIds = new HashSet<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();

Review Comment:
   shall we rename to `dagIdToRunningJob`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from 
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate 
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class InMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new 
HashMap<>();
+  private final Set<String> failedDagIds = new HashSet<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
+  final Map<String, Long> dagToSLA = new HashMap<>();
+  private final Set<String> dagIdstoClean = new HashSet<>();
+  private Optional<DagActionStore> dagActionStore;
+
+  @Override
+  public synchronized void deleteJobState(String dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {
+    this.jobToDag.remove(dagNode);
+    this.dagToJobs.get(dagId).remove(dagNode);
+    this.dagToSLA.remove(dagId);
+  }
+
+
+  @Override
+  public synchronized void addJobState(String dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {
+    Dag<JobExecutionPlan> dag = this.dagIdToDags.get(dagId);
+    this.jobToDag.put(dagNode, dag);
+    if (this.dagToJobs.containsKey(dagId)) {
+      this.dagToJobs.get(dagId).add(dagNode);
+    } else {
+      LinkedList<Dag.DagNode<JobExecutionPlan>> dagNodeList = 
Lists.newLinkedList();
+      dagNodeList.add(dagNode);
+      this.dagToJobs.put(dagId, dagNodeList);
+    }
+  }
+
+  @Override
+  public synchronized boolean hasRunningJobs(String dagId) {
+    List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
+    return dagNodes != null && !dagNodes.isEmpty();
+  }
+
+  @Override
+  public synchronized void removeDagActionFromStore(DagManager.DagId dagId, 
DagActionStore.FlowActionType flowActionType) throws IOException {
+    if (this.dagActionStore.isPresent()) {
+      this.dagActionStore.get().deleteDagAction(
+          new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName, 
dagId.flowExecutionId, flowActionType));
+    }
+  }
+
+  @Override
+  public void addDagSLA(String dagId, Long flowSla) {
+    this.dagToSLA.putIfAbsent(dagId, flowSla);
+  }
+
+  @Override
+  public Long getDagSLA(String dagId) {
+    if(this.dagToSLA.containsKey(dagId)) {
+      return this.dagToSLA.get(dagId);
+    }
+    return null;
+  }
+
+  @Override
+  public Dag<JobExecutionPlan> getDag(String dagId) {
+    if(this.dagIdToDags.containsKey(dagId)) {
+      return this.dagIdToDags.get(dagId);
+    }
+    return null;

Review Comment:
   we should return `Optional` not null, hard to remember to do null check



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently resume flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) throws IOException {
+     if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will enforce on cancelling the job when SLA is 
reached.
+   * It will return true and cancellation would be initiated via {@link 
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+   * by creating {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype: 
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
+   * The Flow SLA will be set when the {@link Dag} is launched either via 
Scheduler or REST client
+   * @param node dag node of the job
+   * @return true if the job reached sla needs to be cancelled
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+
+  @Override
+  public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> 
node) throws ExecutionException, InterruptedException {
+    //TODO: need to fetch timestamps from database when multi-active is enabled
+    long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+    long currentTime = System.currentTimeMillis();
+    String dagId = DagManagerUtils.generateDagId(node).toString();
+
+    long flowSla = this.dagManagementStateStore.getDagSLA(dagId);
+
+    if (currentTime > flowStartTime + flowSla) {
+      log.info("Flow {} exceeded the SLA of {} ms. Enforce cancellation of the 
job {} ...",
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
 flowSla,
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+      dagManagerMetrics.incrementExecutorSlaExceeded(node);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Enforce cancel on the job if the job has been "orphaned". A job is 
orphaned if has been in ORCHESTRATED
+   * {@link ExecutionStatus} for some specific amount of time.
+   * @param node {@link Dag.DagNode} representing the job
+   * @param jobStatus current {@link JobStatus} of the job
+   * @return true if the total time that the job remains in the ORCHESTRATED 
state exceeds
+   * {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
+   */
+
+  @Override
+  public boolean enforceJobStartDeadline(Dag.DagNode<JobExecutionPlan> node, 
JobStatus jobStatus) throws ExecutionException, InterruptedException {
+    if (jobStatus == null) {
+      return false;
+    }
+    ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+    //TODO: timestamps needs to be fetched from database instead of using 
System.currentTimeMillis()
+    long timeOutForJobStart = DagManagerUtils.getJobStartSla(node, 
System.currentTimeMillis());
+    long jobOrchestratedTime = jobStatus.getOrchestratedTime();
+    if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - 
jobOrchestratedTime > timeOutForJobStart) {
+      log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Enforce 
cancellation of the job ...",
+          DagManagerUtils.getJobName(node),
+          DagManagerUtils.getFullyQualifiedDagName(node),
+          timeOutForJobStart);
+      dagManagerMetrics.incrementCountsStartSlaExceeded(node);

Review Comment:
   this doesn't actually kill or restart the job it looks like right, we just 
return a boolean. Let's rename the function then to say `is...DeadlineExceeded` 
for this function and above one. Let's have a separate function that enforces 
it and does the action. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/CleanUpDagProc.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+
+
+/**
+ * An implementation of {@link DagProc} that is responsible for cleaning up 
{@link Dag} that has reached an end state
+ * likewise: FAILED, COMPLETE or CANCELED

Review Comment:
   nit: `ie:` or `including:`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -136,14 +144,22 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
+
       if (operation.equals("INSERT")) {
         if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
           log.info("Received insert dag action and about to send resume flow 
request");
           dagManager.handleResumeFlowRequest(flowGroup, 
flowName,Long.parseLong(flowExecutionId));
+          //TODO: add a flag for if condition only if multi-active is enabled
           this.resumesInvoked.mark();
         } else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
           log.info("Received insert dag action and about to send kill flow 
request");
           dagManager.handleKillFlowRequest(flowGroup, flowName, 
Long.parseLong(flowExecutionId));

Review Comment:
   the conditional above should be checked first and this should be the else 
right?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+  abstract protected S initialize(DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, IOException;
+  abstract protected R act(S state, DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, Exception;
+  abstract protected void sendNotification(R result, EventSubmitter 
eventSubmitter) throws MaybeRetryableException, IOException;
+
+  public final void process(DagManagementStateStore dagManagementStateStore, 
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
+    try {
+      S state = this.initializeWithRetries(dagManagementStateStore, 
maxRetryCount, delayRetryMillis);
+      R result = this.actWithRetries(state, dagManagementStateStore, 
maxRetryCount, delayRetryMillis); // may be pass state store too here
+      this.sendNotificationWithRetries(result, eventSubmitter, maxRetryCount, 
delayRetryMillis);
+      log.info("Successfully processed Dag Request");
+    } catch (Exception ex) {
+      throw new RuntimeException("Cannot process Dag Request: ", ex);
+    }
+  }
+
+  protected final S initializeWithRetries(DagManagementStateStore 
dagManagementStateStore, int maxRetryCount, long delayRetryMillis) throws 
IOException {
+    for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) {
+      try {
+        return this.initialize(dagManagementStateStore);
+      } catch (MaybeRetryableException e) {
+        if (retryCount < maxRetryCount - 1) { // Don't wait before the last 
retry
+          waitBeforeRetry(delayRetryMillis);
+        }
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    throw new RuntimeException("Max retry attempts reached. Cannot initialize 
Dag");
+  }
+
+  protected final R actWithRetries(S state, DagManagementStateStore 
dagManagementStateStore, int maxRetryCount, long delayRetryMillis) {

Review Comment:
   should we have a generic method that deals with retries and takes a function 
as parameter? Seems like you are repeating code between these two functions and 
want to handle them the same way



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagManager} would pull from 
to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} defines the rules 
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if 
available to {@link DagManager}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterator<Optional<DagTask>>, 
DagManagement {
+
+  @Getter
+  private final BlockingDeque<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingDeque<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @Override
+  public boolean hasNext() {
+    return true;
+  }
+
+
+  @Override
+  public Optional<DagTask> next() {
+
+    DagActionStore.DagAction dagAction = dagActionQueue.peek();
+    try {
+      Preconditions.checkArgument(dagAction != null, "No Dag Action found in 
the queue");
+      Properties jobProps = getJobProperties(dagAction);
+      Optional<MultiActiveLeaseArbiter.LeaseObtainedStatus> 
leaseObtainedStatus =
+          flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis()).toJavaUtil();
+      if(leaseObtainedStatus.isPresent()) {
+        DagTask dagTask = createDagTask(dagAction, leaseObtainedStatus.get());
+        return Optional.of(dagTask);
+      }
+    } catch (Exception ex) {
+      //TODO: need to handle exceptions gracefully
+      throw new RuntimeException(ex);
+    }
+    return Optional.empty();
+  }
+
+  public boolean add(DagActionStore.DagAction dagAction) throws IOException {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  public DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  public DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long  
triggerTimeStamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(String flowGroup, String flowName, String 
flowExecutionId, long  triggerTimeStamp) throws IOException, 
InterruptedException {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(String flowGroup, String flowName, String 
flowExecutionId, long  produceTimestamp) throws IOException {
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(flowGroup, 
flowName, flowExecutionId);
+    
if(!this.dagManagementStateStore.getDagIdToDags().containsKey(dagId.toString()))
 {
+      log.info("Invalid dag since not present in map. Hence cannot cancel it");
+      return;
+    }
+    DagActionStore.DagAction killAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.KILL);
+      if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will try to cancel the job when SLA is reached.
+   *
+   * @param node dag node of the job
+   * @return true if the job is killed because it reached sla
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+
+  @Override
+  public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> 
node) throws ExecutionException, InterruptedException {
+    //TODO: need to distribute the responsibility outside of this class
+    long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+    long currentTime = System.currentTimeMillis();
+    String dagId = DagManagerUtils.generateDagId(node).toString();
+
+    long flowSla;
+    if (this.dagManagementStateStore.getDagToSLA().containsKey(dagId)) {
+      flowSla = this.dagManagementStateStore.getDagToSLA().get(dagId);
+    } else {
+      try {
+        flowSla = DagManagerUtils.getFlowSLA(node);
+      } catch (ConfigException e) {
+        log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in invalid 
format, using default SLA of {}",
+            
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+            
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+            DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
+        flowSla = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+      }
+      this.dagManagementStateStore.getDagToSLA().put(dagId, flowSla);
+    }
+
+    if (currentTime > flowStartTime + flowSla) {
+      log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
 flowSla,
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+      dagManagerMetrics.incrementExecutorSlaExceeded(node);
+      KillDagProc.killDagNode(node);
+
+      
this.dagManagementStateStore.getDagIdToDags().get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
+      
this.dagManagementStateStore.getDagIdToDags().get(dagId).setMessage("Flow 
killed due to exceeding SLA of " + flowSla + " ms");
+
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Cancel the job if the job has been "orphaned". A job is orphaned if has 
been in ORCHESTRATED
+   * {@link ExecutionStatus} for some specific amount of time.
+   * @param node {@link Dag.DagNode} representing the job
+   * @param jobStatus current {@link JobStatus} of the job
+   * @return true if the total time that the job remains in the ORCHESTRATED 
state exceeds
+   * {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
+   */
+
+  @Override
+  public boolean enforceJobStartDeadline(Dag.DagNode<JobExecutionPlan> node, 
JobStatus jobStatus) throws ExecutionException, InterruptedException {
+    //TODO: need to distribute the responsibility outside of this class
+    if (jobStatus == null) {
+      return false;
+    }
+    ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+    //TODO: initialize default job sla in millis via configs
+    long timeOutForJobStart = DagManagerUtils.getJobStartSla(node, 
System.currentTimeMillis());
+    long jobOrchestratedTime = jobStatus.getOrchestratedTime();
+    if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - 
jobOrchestratedTime > timeOutForJobStart) {
+      log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing 
the job now...",
+          DagManagerUtils.getJobName(node),
+          DagManagerUtils.getFullyQualifiedDagName(node),
+          timeOutForJobStart);
+      dagManagerMetrics.incrementCountsStartSlaExceeded(node);
+      KillDagProc.killDagNode(node);
+
+      String dagId = DagManagerUtils.generateDagId(node).toString();
+      
dagManagementStateStore.getDagIdToDags().get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
+      dagManagementStateStore.getDagIdToDags().get(dagId).setMessage("Flow 
killed because no update received for " + timeOutForJobStart + " ms after 
orchestration");
+      return true;
+    } else {
+      return false;
+    }
+
+  }
+
+  /**
+   * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+   */
+
+  protected JobStatus retrieveJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) 
{

Review Comment:
   plus 1 we can move these to the `DagManagementStateStore`, we can call those 
methods if needed



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+  public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode);
+
+  public boolean hasRunningJobs(String dagId);
+
+  public void removeDagActionFromStore(DagManager.DagId dagId, 
DagActionStore.FlowActionType flowActionType) throws IOException;
+
+  public void addDagSLA(String dagId, Long flowSla);
+
+  public Long getDagSLA(String dagId);
+
+  public Dag<JobExecutionPlan> getDag(String dagId);
+
+  public LinkedList<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) 
throws IOException;
+
+  public boolean addFailedDagId(String dagId);

Review Comment:
   does this add to failedDagId store?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();

Review Comment:
   any exceptions possible to result in this method and one above?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;

Review Comment:
   `final` as well



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+  private static final long DEFAULT_RETRY_DELAY_MS = 1000;

Review Comment:
   these can also be declared here or in configuration files above as is normal 
with other configurations but I'm less particular about it



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.

Review Comment:
   this sentence is a bit hard to read `Maintains a stream of ... that ... it 
polls when ready for more work to process`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently resume flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) throws IOException {
+     if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will enforce on cancelling the job when SLA is 
reached.
+   * It will return true and cancellation would be initiated via {@link 
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+   * by creating {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype: 
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
+   * The Flow SLA will be set when the {@link Dag} is launched either via 
Scheduler or REST client
+   * @param node dag node of the job
+   * @return true if the job reached sla needs to be cancelled
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+
+  @Override
+  public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> 
node) throws ExecutionException, InterruptedException {
+    //TODO: need to fetch timestamps from database when multi-active is enabled
+    long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+    long currentTime = System.currentTimeMillis();
+    String dagId = DagManagerUtils.generateDagId(node).toString();
+
+    long flowSla = this.dagManagementStateStore.getDagSLA(dagId);
+
+    if (currentTime > flowStartTime + flowSla) {
+      log.info("Flow {} exceeded the SLA of {} ms. Enforce cancellation of the 
job {} ...",
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
 flowSla,
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));

Review Comment:
   good descriptive message. Let's add one detail exceeded the _completion SLA_ 
as opposed to start



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently resume flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) throws IOException {
+     if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will enforce on cancelling the job when SLA is 
reached.
+   * It will return true and cancellation would be initiated via {@link 
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+   * by creating {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype: 
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}

Review Comment:
   can shorten this since we have the "@return true" below and just have `via 
...` joining with sentence above



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";

Review Comment:
   let's put this in the `ServiceConfigKeys` or `ConfigurationKeys`. Also agree 
with kip's comment above that there should be a `dag_processing_prefix` used 
before them all



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently resume flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) throws IOException {
+     if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will enforce on cancelling the job when SLA is 
reached.

Review Comment:
   remove `on`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would 
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+  @Getter
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+  private FlowTriggerHandler flowTriggerHandler;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+
+
+  @NotNull
+  @Override
+  public Iterator<DagTask> iterator() {
+    return new Iterator<DagTask>() {
+
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+
+      @Override
+      public DagTask next() {
+
+        DagTask dagTask = null;
+        while(true) {
+          DagActionStore.DagAction dagAction = take();
+          Properties jobProps = getJobProperties(dagAction);
+          try {
+            MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+            if(leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+              dagTask = createDagTask(dagAction,
+                  (MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus);
+            }
+            if (dagTask != null) {
+              break; // Exit the loop when dagTask is non-null
+            }
+          } catch (IOException e) {
+            //TODO: need to handle exceptions gracefully
+            throw new RuntimeException(e);
+          }
+        }
+        return dagTask;
+      }
+    };
+  }
+
+  private boolean add(DagActionStore.DagAction dagAction) {
+    return this.dagActionQueue.offer(dagAction);
+  }
+
+  private DagActionStore.DagAction take() {
+    return this.dagActionQueue.poll();
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+        log.warn("It should not reach here. Yet to provide implementation.");
+        return null;
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  @Override
+  public void launchFlow(String flowGroup, String flowName, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently launch flow is not 
supported.");
+  }
+
+  @Override
+  public void resumeFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) {
+    //TODO: provide implementation after finalizing code flow
+    throw new UnsupportedOperationException("Currently resume flow is not 
supported.");
+  }
+
+  @Override
+  public void killFlow(DagActionStore.DagAction killAction, long 
eventTimestamp) throws IOException {
+     if(!add(killAction)) {
+        throw new IOException("Could not add kill dag action: " + killAction + 
" to the queue.");
+      }
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will enforce on cancelling the job when SLA is 
reached.
+   * It will return true and cancellation would be initiated via {@link 
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+   * by creating {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype: 
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
+   * The Flow SLA will be set when the {@link Dag} is launched either via 
Scheduler or REST client
+   * @param node dag node of the job
+   * @return true if the job reached sla needs to be cancelled
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+
+  @Override
+  public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> 
node) throws ExecutionException, InterruptedException {
+    //TODO: need to fetch timestamps from database when multi-active is enabled
+    long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+    long currentTime = System.currentTimeMillis();
+    String dagId = DagManagerUtils.generateDagId(node).toString();
+
+    long flowSla = this.dagManagementStateStore.getDagSLA(dagId);
+
+    if (currentTime > flowStartTime + flowSla) {
+      log.info("Flow {} exceeded the SLA of {} ms. Enforce cancellation of the 
job {} ...",
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
 flowSla,
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+      dagManagerMetrics.incrementExecutorSlaExceeded(node);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Enforce cancel on the job if the job has been "orphaned". A job is 
orphaned if has been in ORCHESTRATED
+   * {@link ExecutionStatus} for some specific amount of time.

Review Comment:
   specific amount of time determined by config?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from 
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate 
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class InMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new 
HashMap<>();
+  private final Set<String> failedDagIds = new HashSet<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
+  final Map<String, Long> dagToSLA = new HashMap<>();
+  private final Set<String> dagIdstoClean = new HashSet<>();
+  private Optional<DagActionStore> dagActionStore;
+
+  @Override
+  public synchronized void deleteJobState(String dagId, 
Dag.DagNode<JobExecutionPlan> dagNode) {

Review Comment:
   may throw exception if the `dagNode` does not exist



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/AdvanceDagProc.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+
+
+/**
+ * An implementation of {@link DagProc} dealing which advancing to the next 
node in the {@link Dag}.

Review Comment:
   `dealing WITH`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information 
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their 
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from 
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate 
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class InMemoryDagManagementStateStore implements 
DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new 
HashMap<>();
+  private final Set<String> failedDagIds = new HashSet<>();
+  private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
+  final Map<String, Long> dagToSLA = new HashMap<>();

Review Comment:
   `dagIdToSLA/Deadline`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/CleanUpDagProc.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+
+
+/**
+ * An implementation of {@link DagProc} that is responsible for cleaning up 
{@link Dag} that has reached an end state
+ * likewise: FAILED, COMPLETE or CANCELED
+ *
+ */
+@Slf4j
+@Alpha
+public class CleanUpDagProc extends DagProc {
+
+  private CleanUpDagTask cleanUpDagTask;
+
+  public CleanUpDagProc(CleanUpDagTask cleanUpDagTask) {
+    this.cleanUpDagTask = cleanUpDagTask;
+  }
+
+  @Override
+  protected Object initialize(DagManagementStateStore dagManagementStateStore) 
throws MaybeRetryableException, IOException {
+    throw new UnsupportedOperationException("Not supported");
+

Review Comment:
   extra space



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.orchestration.processor.DagProc;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given 
{@link DagTask}.
+ */
+
+@Alpha
+@Slf4j
+public class DagProcFactory implements DagTaskVisitor<DagProc> {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private JobStatusRetriever jobStatusRetriever;
+  private FlowStatusGenerator flowStatusGenerator;
+  private UserQuotaManager quotaManager;
+  private SpecCompiler specCompiler;
+  private FlowCatalog flowCatalog;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private Config config;
+  private Optional<EventSubmitter> eventSubmitter;
+  private boolean instrumentationEnabled;
+

Review Comment:
   since we have attempted to initialize with Guice since previous revision 
let's also add the `@Inject` annotation to the constructor 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -313,4 +317,42 @@ protected static long getUTCTimeFromDelayPeriod(long 
delayPeriodMillis) {
     Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
     return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date);
   }
+
+  /**
+   * Attempts to acquire lease for a given {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}
+   * through lease arbitration and if it fails, it will create and schedule a 
reminder trigger to check back again.

Review Comment:
   let's update this in the next iteration



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states

Review Comment:
   add more description like `for a Dag, allows one to update or extract its 
job state or ...` giving some high level descriptions 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -94,6 +99,9 @@ public DagActionStoreChangeMonitor(String topic, Config 
config, DagActionStore d
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+    // instantiating using default ctor; subsequent PR will handle 
instantiating with multi-args ctor
+//    this.dagTaskStream = new DagTaskStream();

Review Comment:
   yea we should use guice to bring in it in, see the 
`DagActionStoreChangeMonitorFactory`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -136,14 +144,22 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
+
       if (operation.equals("INSERT")) {
         if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
           log.info("Received insert dag action and about to send resume flow 
request");
           dagManager.handleResumeFlowRequest(flowGroup, 
flowName,Long.parseLong(flowExecutionId));
+          //TODO: add a flag for if condition only if multi-active is enabled
           this.resumesInvoked.mark();
         } else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
           log.info("Received insert dag action and about to send kill flow 
request");
           dagManager.handleKillFlowRequest(flowGroup, flowName, 
Long.parseLong(flowExecutionId));

Review Comment:
   or are we trying a "dummy kill in the new refactor" if so let's add a 
comment that the change below doesn't carry out the action. it's still in 
testing. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios 
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+  /**
+   * Currently, it is handling just the launch of a {@link Dag} request via 
REST client for adhoc flows.
+   * The eventTimestamp for adhoc flows will always be 0 (zero), while -1 
would indicate failures.
+   * Essentially for a valid launch flow, the eventTimestamp needs to be >= 0.
+   * Future implementations will cover launch of flows through the scheduler 
too!

Review Comment:
   Agree with Kip's suggestion here to make it read like an interface 
   "Handles launching of ... via ..." 
   @param followed by requirements
   @throws if input doesn't match



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to