phet commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1326634914


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AdvanceDagProc.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metastore.MysqlDagStateStoreFactory;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+
+
+/**
+ * An implementation of {@link DagProc} dealing which advancing to the next 
node in the {@link Dag}.
+ * This Dag Procedure will deal with pending Job statuses such as: PENDING, 
PENDING_RESUME, PENDING_RETRY
+ * as well jobs that have reached an end state with statuses such as: 
COMPLETED, FAILED and CANCELLED.
+ * Primarily, it will be responsible for polling the flow and job statuses
+ */
+@Slf4j
+@WorkInProgress

Review Comment:
   I'm unfamiliar w/ this one... and don't see where it's imported from



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AdvanceDagProc.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;

Review Comment:
   since this is new code, let's opt for `java.util.Optional`, as the guava one 
is deprecated



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagStateStore} to provide information about 
dags, dag nodes and their job states.
+ * Currently, this store maintains and utilizes in-memory references about 
dags and their job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from 
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate 
independently.
+ */
+@Getter
+@WorkInProgress
+public class DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
+  private final Set<String> failedDagIds = new HashSet<>();
+  private final Map<String, Dag<JobExecutionPlan>> resumingDags = new 
HashMap<>();

Review Comment:
   same here: what's the key?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AdvanceDagProc.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metastore.MysqlDagStateStoreFactory;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+
+
+/**
+ * An implementation of {@link DagProc} dealing which advancing to the next 
node in the {@link Dag}.
+ * This Dag Procedure will deal with pending Job statuses such as: PENDING, 
PENDING_RESUME, PENDING_RETRY
+ * as well jobs that have reached an end state with statuses such as: 
COMPLETED, FAILED and CANCELLED.
+ * Primarily, it will be responsible for polling the flow and job statuses
+ */
+@Slf4j
+@WorkInProgress
+public class AdvanceDagProc extends DagProc {
+  private Optional<DagActionStore> dagActionStore;
+  private Optional<EventSubmitter> eventSubmitter;
+  private DagStateStore dagStateStore;
+  private MetricContext metricContext;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+  private MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+  private UserQuotaManager quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+      ConfigUtils.getString(ConfigBuilder.create().build(), 
ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
+      ConfigBuilder.create().build());
+
+  public AdvanceDagProc() throws IOException {
+    //TODO: add this to dagproc factory instead
+    this.dagManagementStateStore = new DagManagementStateStore();
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.multiActiveLeaseArbiter = new 
MysqlMultiActiveLeaseArbiter(ConfigBuilder.create().build());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
+
+  }
+  @Override
+  protected Object initialize() {
+    return null;
+  }
+
+  @Override
+  protected Object act(Object state) throws ExecutionException, 
InterruptedException, IOException {
+    return null;
+  }
+
+  @Override
+  protected void sendNotification(Object result) throws 
MaybeRetryableException {
+
+  }
+  private void initialize(Dag<JobExecutionPlan> dag)
+      throws IOException {
+    //Add Dag to the map of running dags
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
+    log.info("Initializing Dag {}", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+    if (this.dagManagementStateStore.getDags().containsKey(dagId)) {
+      log.warn("Already tracking a dag with dagId {}, skipping.", dagId);
+      return;
+    }
+
+    this.dagManagementStateStore.getDags().put(dagId, dag);
+    log.debug("Dag {} - determining if any jobs are already running.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+
+    //A flag to indicate if the flow is already running.
+    boolean isDagRunning = false;
+    //Are there any jobs already in the running state? This check is for Dags 
already running
+    //before a leadership change occurs.
+    for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+      if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
+        this.dagManagementStateStore.addJobState(dagId, dagNode);
+        //Update the running jobs counter.
+        dagManagerMetrics.incrementRunningJobMetrics(dagNode);
+        isDagRunning = true;
+      }
+    }
+
+    FlowId flowId = DagManagerUtils.getFlowId(dag);
+    this.dagManagerMetrics.registerFlowMetric(flowId, dag);
+
+    log.debug("Dag {} submitting jobs ready for execution.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+    //Determine the next set of jobs to run and submit them for execution
+    Map<String, Set<Dag.DagNode<JobExecutionPlan>>> nextSubmitted = 
submitNext(dagId);
+    for (Dag.DagNode<JobExecutionPlan> dagNode: nextSubmitted.get(dagId)) {
+      this.dagManagementStateStore.addJobState(dagId, dagNode);
+    }
+
+    // Set flow status to running
+    DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, 
TimingEvent.FlowTimings.FLOW_RUNNING);
+    dagManagerMetrics.conditionallyMarkFlowAsState(flowId, 
DagManager.FlowState.RUNNING);
+
+    // Report the orchestration delay the first time the Dag is initialized. 
Orchestration delay is defined as
+    // the time difference between the instant when a flow first transitions 
to the running state and the instant
+    // when the flow is submitted to Gobblin service.
+    if (!isDagRunning) {
+      //TODO: need to set orchestration delay
+//      this.orchestrationDelay.set(System.currentTimeMillis() - 
DagManagerUtils.getFlowExecId(dag));
+    }
+
+    log.info("Dag {} Initialization complete.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+  }
+  /**

Review Comment:
   both this and the method above need a space between them and the end of the 
one prior



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that needs to be processed by the {@link 
DagManager}.

Review Comment:
   I might rephrase to "...that the DM would pull from to process, as it is 
ready"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that needs to be processed by the {@link 
DagManager}.
+ * It provides an implementation for {@link DagManagement} defines the rules 
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if 
available to {@link DagManager}
+ */
+@WorkInProgress
+@Slf4j
+public class DagTaskStream implements Iterator<Optional<DagTask>>, 
DagManagement {
+  @Getter
+  private final BlockingDeque<DagTask> taskStream = new 
LinkedBlockingDeque<>();

Review Comment:
   I'm not sure it's appropriate for callers to get a hold of the queue 
directly (or even find out that it is backed by a queue).  
   
   instead provide encapsulating methods here to "add" and to "take" a `DagTask`
   
   final nit: it's confusing to name a member of a class after the class 
itself; here I'd prefer `taskQueue`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios 
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@WorkInProgress
+public interface DagManagement {
+  /**
+   * Currently, it is handling just the launch of a {@link Dag} request via 
REST client for adhoc flows
+   * @param launchDagTask
+   */
+  void launchFlow(LaunchDagTask launchDagTask);
+  /**
+   * Currently, it is handling just the resume of a {@link Dag} request via 
REST client for adhoc flows
+   * @param resumeDagTask
+   */
+  void resumeFlow(ResumeDagTask resumeDagTask) throws IOException;

Review Comment:
   e.g. should this and kill take a `DagAction` as their param?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that needs to be processed by the {@link 
DagManager}.
+ * It provides an implementation for {@link DagManagement} defines the rules 
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if 
available to {@link DagManager}
+ */
+@WorkInProgress
+@Slf4j
+public class DagTaskStream implements Iterator<Optional<DagTask>>, 
DagManagement {
+  @Getter
+  private final BlockingDeque<DagTask> taskStream = new 
LinkedBlockingDeque<>();
+  private JobStatusRetriever jobStatusRetriever;
+  private Optional<Timer> jobStatusPolledTimer;
+
+  private DagManagerMetrics dagManagerMetrics;
+
+  private DagManagementStateStore dagManagementStateStore;
+
+  private Long defaultJobStartSlaTimeMillis;
+  private FlowTriggerHandler flowTriggerHandler;
+  private Optional<DagActionStore> dagActionStore;
+  private DagStateStore failedDagStateStore;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private FlowCatalog flowCatalog = new 
FlowCatalog(ConfigBuilder.create().build());
+
+  //TODO: add ctor for instantiating the attributes (will be handled in the 
subsequent PR)
+
+  @Override
+  public boolean hasNext() {
+    return !taskStream.isEmpty();
+  }
+
+  @Override
+  public Optional<DagTask> next() {
+
+    DagTask dagTask = taskStream.peek();
+
+    try {
+      if(flowTriggerHandler.attemptDagTaskLeaseAcquisition(dagTask)) {
+        return Optional.of(taskStream.poll());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return Optional.absent();
+  }
+
+  @Override
+  public void launchFlow(LaunchDagTask launchDagTask) {
+    long triggerTimeStamp = System.currentTimeMillis();

Review Comment:
   doesn't the trigger timestamp come from the scheduler?  I wouldn't think 
we're free to make up our own here



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -313,4 +313,46 @@ protected static long getUTCTimeFromDelayPeriod(long 
delayPeriodMillis) {
     Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
     return GobblinServiceJobScheduler.asUTCEpochMillis(date);
   }
+
+  /**
+   * Attempts to acquire lease for a given {@link DagTask} through lease 
arbitration
+   * @param dagTask for which we want to attempt to acquire a lease
+   * @return true if successfully acquired lease, or else false

Review Comment:
   as mentioned already, the `DagTask` must hold on to its specific 
`LeaseObtainedStatus`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/KillDagProc.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.api.client.util.Lists;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@WorkInProgress
+public final class KillDagProc extends DagProc {
+
+  private DagManager.DagId killDagId;
+  private DagManagementStateStore dagManagementStateStore;
+  private MetricContext metricContext;
+  private Optional<EventSubmitter> eventSubmitter;
+  private MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+
+
+  public KillDagProc(DagManager.DagId killDagId) throws IOException {
+    this.killDagId = killDagId;
+    //TODO: add this to dagproc factory
+    this.dagManagementStateStore = new DagManagementStateStore();

Review Comment:
   yes, either this should be passed by the DM to the `DagProcFactory` who 
passes it into this ctor... or perhaps better yet, the DM would pass it to 
`process`, who would make it available internally to `initialize` and `act`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AdvanceDagProc.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metastore.MysqlDagStateStoreFactory;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+
+
+/**
+ * An implementation of {@link DagProc} dealing which advancing to the next 
node in the {@link Dag}.
+ * This Dag Procedure will deal with pending Job statuses such as: PENDING, 
PENDING_RESUME, PENDING_RETRY
+ * as well jobs that have reached an end state with statuses such as: 
COMPLETED, FAILED and CANCELLED.
+ * Primarily, it will be responsible for polling the flow and job statuses
+ */
+@Slf4j
+@WorkInProgress
+public class AdvanceDagProc extends DagProc {
+  private Optional<DagActionStore> dagActionStore;
+  private Optional<EventSubmitter> eventSubmitter;
+  private DagStateStore dagStateStore;
+  private MetricContext metricContext;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+  private MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+  private UserQuotaManager quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+      ConfigUtils.getString(ConfigBuilder.create().build(), 
ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
+      ConfigBuilder.create().build());
+
+  public AdvanceDagProc() throws IOException {
+    //TODO: add this to dagproc factory instead
+    this.dagManagementStateStore = new DagManagementStateStore();
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.multiActiveLeaseArbiter = new 
MysqlMultiActiveLeaseArbiter(ConfigBuilder.create().build());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
+
+  }
+  @Override
+  protected Object initialize() {
+    return null;
+  }
+
+  @Override
+  protected Object act(Object state) throws ExecutionException, 
InterruptedException, IOException {
+    return null;
+  }
+
+  @Override
+  protected void sendNotification(Object result) throws 
MaybeRetryableException {
+
+  }
+  private void initialize(Dag<JobExecutionPlan> dag)
+      throws IOException {
+    //Add Dag to the map of running dags
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
+    log.info("Initializing Dag {}", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+    if (this.dagManagementStateStore.getDags().containsKey(dagId)) {
+      log.warn("Already tracking a dag with dagId {}, skipping.", dagId);
+      return;
+    }
+
+    this.dagManagementStateStore.getDags().put(dagId, dag);
+    log.debug("Dag {} - determining if any jobs are already running.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+
+    //A flag to indicate if the flow is already running.
+    boolean isDagRunning = false;
+    //Are there any jobs already in the running state? This check is for Dags 
already running
+    //before a leadership change occurs.
+    for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+      if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
+        this.dagManagementStateStore.addJobState(dagId, dagNode);
+        //Update the running jobs counter.
+        dagManagerMetrics.incrementRunningJobMetrics(dagNode);
+        isDagRunning = true;
+      }
+    }
+
+    FlowId flowId = DagManagerUtils.getFlowId(dag);
+    this.dagManagerMetrics.registerFlowMetric(flowId, dag);
+
+    log.debug("Dag {} submitting jobs ready for execution.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+    //Determine the next set of jobs to run and submit them for execution
+    Map<String, Set<Dag.DagNode<JobExecutionPlan>>> nextSubmitted = 
submitNext(dagId);
+    for (Dag.DagNode<JobExecutionPlan> dagNode: nextSubmitted.get(dagId)) {
+      this.dagManagementStateStore.addJobState(dagId, dagNode);
+    }
+
+    // Set flow status to running
+    DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, 
TimingEvent.FlowTimings.FLOW_RUNNING);
+    dagManagerMetrics.conditionallyMarkFlowAsState(flowId, 
DagManager.FlowState.RUNNING);
+
+    // Report the orchestration delay the first time the Dag is initialized. 
Orchestration delay is defined as
+    // the time difference between the instant when a flow first transitions 
to the running state and the instant
+    // when the flow is submitted to Gobblin service.
+    if (!isDagRunning) {
+      //TODO: need to set orchestration delay
+//      this.orchestrationDelay.set(System.currentTimeMillis() - 
DagManagerUtils.getFlowExecId(dag));
+    }
+
+    log.info("Dag {} Initialization complete.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+  }
+  /**
+   * Submit next set of Dag nodes in the Dag identified by the provided dagId
+   * @param dagId The dagId that should be processed.
+   * @return
+   * @throws IOException
+   */
+  synchronized Map<String, Set<Dag.DagNode<JobExecutionPlan>>> 
submitNext(String dagId) throws IOException {

Review Comment:
   this looks like the only `synchronized` method of the class... is there risk 
of this one particular method being called simultaneously?
   
   also... why package protected access level?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -136,14 +148,26 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
+

Review Comment:
   snuck in



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTask.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+
+
+/**
+ * Defines an individual task or job in a Dag.
+ * It carries the state information required by {@link DagProc} to for its 
processing.
+ * Upon completion of the {@link DagProc#process()} it will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} 
as complete
+ * @param <T>
+ */
+@WorkInProgress
+public abstract class DagTask<T> {
+
+  protected Properties jobProps;
+  protected DagActionStore.DagAction flowAction;
+  protected long triggerTimeStamp;
+
+  protected MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus;
+  abstract void initialize(Object state, long triggerTimeStamp);
+
+  /**
+   * Currently, I don't see any need for having to mark conclusion of {@link 
DagTask}.
+   * Each task submits an event after processing, resulting in change in 
status for that job.
+   *
+   */
+  abstract void conclude();

Review Comment:
   that's true, the event would have been sent to update state, but there's 
still the need to `recordLeaseSuccess`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagStateStore} to provide information about 
dags, dag nodes and their job states.
+ * Currently, this store maintains and utilizes in-memory references about 
dags and their job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from 
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate 
independently.
+ */
+@Getter
+@WorkInProgress
+public class DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();

Review Comment:
   best if you can work the key into the name



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagStateStore} to provide information about 
dags, dag nodes and their job states.
+ * Currently, this store maintains and utilizes in-memory references about 
dags and their job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link 
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply 
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from 
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate 
independently.
+ */
+@Getter
+@WorkInProgress
+public class DagManagementStateStore {
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
+  private final Set<String> failedDagIds = new HashSet<>();
+  private final Map<String, Dag<JobExecutionPlan>> resumingDags = new 
HashMap<>();
+  // dagToJobs holds a map of dagId to running jobs of that dag
+  final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
+  final Map<String, Long> dagToSLA = new HashMap<>();
+  private final Set<String> dagIdstoClean = new HashSet<>();
+  private Optional<DagActionStore> dagActionStore;
+
+  protected void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode) {

Review Comment:
   these methods should likely be `synchronized`, in-flux updates, where some 
collections are changed, but not yet all



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTask.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+
+
+/**
+ * Defines an individual task or job in a Dag.
+ * It carries the state information required by {@link DagProc} to for its 
processing.
+ * Upon completion of the {@link DagProc#process()} it will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} 
as complete
+ * @param <T>
+ */
+@WorkInProgress
+public abstract class DagTask<T> {
+
+  protected Properties jobProps;
+  protected DagActionStore.DagAction flowAction;
+  protected long triggerTimeStamp;
+
+  protected MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus;

Review Comment:
   since AFAIU a lease was successfully obtained, not merely tried for, the 
type here should specifically be `LeaseObtainedStatus`.
   
   anyway, it should probably be `private`, given via a ctor that derived 
classes would invoke via `super`, and be used only within the `conclude` 
method, which should be implemented in this base class (not `abstract`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import com.google.common.base.Optional;

Review Comment:
   `java.util.Optional`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProc.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@WorkInProgress
+public abstract class DagProc<S, R> {
+  abstract protected S initialize() throws MaybeRetryableException;
+  abstract protected R act(S state) throws ExecutionException, 
InterruptedException, IOException;
+  abstract protected void sendNotification(R result) throws 
MaybeRetryableException;
+
+  void process(MultiActiveLeaseArbiter.LeaseAttemptStatus leaseStatus) {
+  throw new UnsupportedOperationException(" Process unsupported");

Review Comment:
   I'm thinking this is where you'd call `initialize`, `act` and 
`sendNotification`... is that correct?
   
   that's to say `process` is the general skeleton that holds for all 
`DagProc`s.  it handles retries, where appropriate, and sends the notification. 
 it can be implemented herein and even, I believe, be `final`.
   
   it might return a boolean indicating success/failure.  when it succeeds, the 
next step would be to complete the `DagTask` it was made from.  I don't believe 
`process` would need to take a `LeaseAttemptStatus` or even know anything about 
lease arbitration.  instead it would potentially be invoked w/ the DMStateStore



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that needs to be processed by the {@link 
DagManager}.
+ * It provides an implementation for {@link DagManagement} defines the rules 
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if 
available to {@link DagManager}
+ */
+@WorkInProgress
+@Slf4j
+public class DagTaskStream implements Iterator<Optional<DagTask>>, 
DagManagement {
+  @Getter
+  private final BlockingDeque<DagTask> taskStream = new 
LinkedBlockingDeque<>();

Review Comment:
   all that aside, where are all these `DagTask`s coming from that we have a 
queue of them?  I thought this `DagTaskStream` would create all of them itself, 
internally, given that a `DagTask` must hold a `LeaseObtainedStatus` 
(confirming that it's ready to be worked on locally).  lease acquisition in 
turn should be done just before the DagMgr gets the task... which is when it's 
taken from this stream



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTask.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+
+
+/**
+ * Defines an individual task or job in a Dag.
+ * It carries the state information required by {@link DagProc} to for its 
processing.
+ * Upon completion of the {@link DagProc#process()} it will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} 
as complete
+ * @param <T>
+ */
+@WorkInProgress
+public abstract class DagTask<T> {
+
+  protected Properties jobProps;
+  protected DagActionStore.DagAction flowAction;
+  protected long triggerTimeStamp;
+
+  protected MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus;
+  abstract void initialize(Object state, long triggerTimeStamp);
+
+  /**
+   * Currently, I don't see any need for having to mark conclusion of {@link 
DagTask}.
+   * Each task submits an event after processing, resulting in change in 
status for that job.
+   *
+   */
+  abstract void conclude();
+  abstract DagProc host(DagTaskVisitor<T> visitor) throws IOException, 
InstantiationException, IllegalAccessException;

Review Comment:
   this and `conclude` should be `public`... perhaps `initialize` too (not 
clear yet how you're using that)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -313,4 +313,46 @@ protected static long getUTCTimeFromDelayPeriod(long 
delayPeriodMillis) {
     Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
     return GobblinServiceJobScheduler.asUTCEpochMillis(date);
   }
+
+  /**
+   * Attempts to acquire lease for a given {@link DagTask} through lease 
arbitration
+   * @param dagTask for which we want to attempt to acquire a lease
+   * @return true if successfully acquired lease, or else false
+   * @throws IOException
+   */
+  public boolean attemptDagTaskLeaseAcquisition(DagTask dagTask) throws 
IOException {
+    if (multiActiveLeaseArbiter.isPresent()) {
+      Properties jobProps = dagTask.jobProps;
+      DagActionStore.DagAction flowAction = dagTask.flowAction;
+      long eventTimeMillis = dagTask.triggerTimeStamp;
+      MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
+      dagTask.leaseAttemptStatus = leaseAttemptStatus;
+      if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+        MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = 
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+        this.leaseObtainedCount.inc();
+        // If persisting the flow action failed, then we set another trigger 
for this event to occur immediately to

Review Comment:
   sorry, confused:
   are we setting this to fire again *immediately* (even though we just 
succeeded in getting the lease)?
   
   or are we setting a reminder in case we ultimately fail to persist the flow 
action... which we're right now about to begin attempting to process?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/WorkInProgress.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+/**
+ * Custom Annotation for classes that are under development.
+ * It will make the classes available only during compilation phase, and not 
in the build.
+ */

Review Comment:
   doesn't guava have an `@Alpha` annotation to convey similar?
   
   I'm not fully clear there's a need to prevent classes w/ this annotation 
from going into the build.  please justify if you perceive a risk we should 
work to avoid.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios 
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@WorkInProgress
+public interface DagManagement {
+  /**
+   * Currently, it is handling just the launch of a {@link Dag} request via 
REST client for adhoc flows
+   * @param launchDagTask
+   */
+  void launchFlow(LaunchDagTask launchDagTask);

Review Comment:
   I was not expecting any caller asking us to launch a flow (or other commands 
here) would themselves have a `DagTask`.  Instead I presumed they'd hold some 
domain concepts they're asking us to work with.  e.g. they might say 
`launchFlow(FlowExecutionId, triggerTimeEpochMillis)`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that needs to be processed by the {@link 
DagManager}.
+ * It provides an implementation for {@link DagManagement} defines the rules 
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if 
available to {@link DagManager}
+ */
+@WorkInProgress
+@Slf4j
+public class DagTaskStream implements Iterator<Optional<DagTask>>, 
DagManagement {
+  @Getter
+  private final BlockingDeque<DagTask> taskStream = new 
LinkedBlockingDeque<>();
+  private JobStatusRetriever jobStatusRetriever;
+  private Optional<Timer> jobStatusPolledTimer;
+
+  private DagManagerMetrics dagManagerMetrics;
+
+  private DagManagementStateStore dagManagementStateStore;
+
+  private Long defaultJobStartSlaTimeMillis;
+  private FlowTriggerHandler flowTriggerHandler;
+  private Optional<DagActionStore> dagActionStore;
+  private DagStateStore failedDagStateStore;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private FlowCatalog flowCatalog = new 
FlowCatalog(ConfigBuilder.create().build());
+
+  //TODO: add ctor for instantiating the attributes (will be handled in the 
subsequent PR)
+
+  @Override
+  public boolean hasNext() {
+    return !taskStream.isEmpty();
+  }
+
+  @Override
+  public Optional<DagTask> next() {
+
+    DagTask dagTask = taskStream.peek();
+
+    try {
+      if(flowTriggerHandler.attemptDagTaskLeaseAcquisition(dagTask)) {
+        return Optional.of(taskStream.poll());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);

Review Comment:
   what are we thinking here?  clearly a single error should not crash our 
entire DAG-handling subsystem...
   
   would the caller catch this and log a message?
   
   should we just be the one to log the message internally and then proceed to 
the next?
   
   perhaps we should provide some kind of `handleException(IOException ioe)` 
hook so behavior need not be hard-coded inside this class?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that needs to be processed by the {@link 
DagManager}.
+ * It provides an implementation for {@link DagManagement} defines the rules 
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if 
available to {@link DagManager}
+ */
+@WorkInProgress
+@Slf4j
+public class DagTaskStream implements Iterator<Optional<DagTask>>, 
DagManagement {
+  @Getter
+  private final BlockingDeque<DagTask> taskStream = new 
LinkedBlockingDeque<>();
+  private JobStatusRetriever jobStatusRetriever;
+  private Optional<Timer> jobStatusPolledTimer;
+
+  private DagManagerMetrics dagManagerMetrics;
+
+  private DagManagementStateStore dagManagementStateStore;
+
+  private Long defaultJobStartSlaTimeMillis;
+  private FlowTriggerHandler flowTriggerHandler;
+  private Optional<DagActionStore> dagActionStore;
+  private DagStateStore failedDagStateStore;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private FlowCatalog flowCatalog = new 
FlowCatalog(ConfigBuilder.create().build());
+
+  //TODO: add ctor for instantiating the attributes (will be handled in the 
subsequent PR)
+
+  @Override
+  public boolean hasNext() {
+    return !taskStream.isEmpty();
+  }

Review Comment:
   following from the concept of a stream, we don't expect this to ever end, do 
we?  if not, this would return `true`, but some calls to `next()` may block.
   
   likely not, but if we did find it worth having a method to "peek" for the 
next method w/o blocking indefinitely, we could add that in addition to `next()`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that needs to be processed by the {@link 
DagManager}.
+ * It provides an implementation for {@link DagManagement} defines the rules 
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if 
available to {@link DagManager}
+ */
+@WorkInProgress
+@Slf4j
+public class DagTaskStream implements Iterator<Optional<DagTask>>, 
DagManagement {
+  @Getter
+  private final BlockingDeque<DagTask> taskStream = new 
LinkedBlockingDeque<>();
+  private JobStatusRetriever jobStatusRetriever;
+  private Optional<Timer> jobStatusPolledTimer;
+
+  private DagManagerMetrics dagManagerMetrics;
+
+  private DagManagementStateStore dagManagementStateStore;
+
+  private Long defaultJobStartSlaTimeMillis;
+  private FlowTriggerHandler flowTriggerHandler;
+  private Optional<DagActionStore> dagActionStore;
+  private DagStateStore failedDagStateStore;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private FlowCatalog flowCatalog = new 
FlowCatalog(ConfigBuilder.create().build());
+
+  //TODO: add ctor for instantiating the attributes (will be handled in the 
subsequent PR)
+
+  @Override
+  public boolean hasNext() {
+    return !taskStream.isEmpty();
+  }
+
+  @Override
+  public Optional<DagTask> next() {
+
+    DagTask dagTask = taskStream.peek();
+
+    try {
+      if(flowTriggerHandler.attemptDagTaskLeaseAcquisition(dagTask)) {
+        return Optional.of(taskStream.poll());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return Optional.absent();
+  }
+
+  @Override
+  public void launchFlow(LaunchDagTask launchDagTask) {
+    long triggerTimeStamp = System.currentTimeMillis();
+    FlowId flowId = new 
FlowId().setFlowGroup(launchDagTask.flowGroup).setFlowName(launchDagTask.flowName);
+    try {
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+      FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
+          
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
+      launchDagTask.initialize(optionalJobExecutionPlanDag.get().getNodes(), 
triggerTimeStamp);
+      this.taskStream.offer(launchDagTask);

Review Comment:
   `offer` returns a boolean of whether or not the element was added.  here, I 
believe we wish to block to eventually add the element.  that would be `put`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FinishResumeDagProc.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+import static org.apache.gobblin.service.ExecutionStatus.FAILED;
+import static org.apache.gobblin.service.ExecutionStatus.PENDING_RESUME;
+
+
+/**
+ * An implementation of {@link DagProc} that will process {@link DagTask} will 
PENDING_RESUME job status.
+ * This can be handled either via {@link AdvanceDagProc} or have a separate 
procedure to handle PENDING_RESUME events.
+ * Currently, I have this boilerplate code, but can decide if it makes have a 
separate procedure for completion of
+ * PENDING_RESUME events.
+ */
+@WorkInProgress
+@Slf4j
+public class FinishResumeDagProc extends DagProc {
+  private DagManagementStateStore dagManagementStateStore = new 
DagManagementStateStore();

Review Comment:
   creating a new, empty one won't work (for maintaining state between proc 
executions)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that needs to be processed by the {@link 
DagManager}.
+ * It provides an implementation for {@link DagManagement} defines the rules 
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if 
available to {@link DagManager}
+ */
+@WorkInProgress
+@Slf4j
+public class DagTaskStream implements Iterator<Optional<DagTask>>, 
DagManagement {
+  @Getter
+  private final BlockingDeque<DagTask> taskStream = new 
LinkedBlockingDeque<>();
+  private JobStatusRetriever jobStatusRetriever;
+  private Optional<Timer> jobStatusPolledTimer;
+
+  private DagManagerMetrics dagManagerMetrics;
+
+  private DagManagementStateStore dagManagementStateStore;
+
+  private Long defaultJobStartSlaTimeMillis;
+  private FlowTriggerHandler flowTriggerHandler;
+  private Optional<DagActionStore> dagActionStore;
+  private DagStateStore failedDagStateStore;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private FlowCatalog flowCatalog = new 
FlowCatalog(ConfigBuilder.create().build());
+
+  //TODO: add ctor for instantiating the attributes (will be handled in the 
subsequent PR)
+
+  @Override
+  public boolean hasNext() {
+    return !taskStream.isEmpty();
+  }
+
+  @Override
+  public Optional<DagTask> next() {
+
+    DagTask dagTask = taskStream.peek();
+
+    try {
+      if(flowTriggerHandler.attemptDagTaskLeaseAcquisition(dagTask)) {
+        return Optional.of(taskStream.poll());
+      }

Review Comment:
   seems to need reworking, since any lease acquisition obtained would need to 
live in the `DagTask`, in order to `complete()` it later



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that needs to be processed by the {@link 
DagManager}.
+ * It provides an implementation for {@link DagManagement} defines the rules 
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if 
available to {@link DagManager}
+ */
+@WorkInProgress
+@Slf4j
+public class DagTaskStream implements Iterator<Optional<DagTask>>, 
DagManagement {
+  @Getter
+  private final BlockingDeque<DagTask> taskStream = new 
LinkedBlockingDeque<>();
+  private JobStatusRetriever jobStatusRetriever;
+  private Optional<Timer> jobStatusPolledTimer;
+
+  private DagManagerMetrics dagManagerMetrics;
+
+  private DagManagementStateStore dagManagementStateStore;
+
+  private Long defaultJobStartSlaTimeMillis;
+  private FlowTriggerHandler flowTriggerHandler;
+  private Optional<DagActionStore> dagActionStore;
+  private DagStateStore failedDagStateStore;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private FlowCatalog flowCatalog = new 
FlowCatalog(ConfigBuilder.create().build());
+
+  //TODO: add ctor for instantiating the attributes (will be handled in the 
subsequent PR)
+
+  @Override
+  public boolean hasNext() {
+    return !taskStream.isEmpty();
+  }
+
+  @Override
+  public Optional<DagTask> next() {
+
+    DagTask dagTask = taskStream.peek();
+
+    try {
+      if(flowTriggerHandler.attemptDagTaskLeaseAcquisition(dagTask)) {
+        return Optional.of(taskStream.poll());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return Optional.absent();
+  }
+
+  @Override
+  public void launchFlow(LaunchDagTask launchDagTask) {
+    long triggerTimeStamp = System.currentTimeMillis();
+    FlowId flowId = new 
FlowId().setFlowGroup(launchDagTask.flowGroup).setFlowName(launchDagTask.flowName);
+    try {
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+      FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
+          
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
+      launchDagTask.initialize(optionalJobExecutionPlanDag.get().getNodes(), 
triggerTimeStamp);
+      this.taskStream.offer(launchDagTask);
+    } catch (URISyntaxException e) {
+      log.warn("Could not create URI object for flowId {} due to exception 
{}", flowId, e.getMessage());
+    } catch (SpecNotFoundException e) {
+      log.warn("Spec not found for flowId {} due to exception {}", flowId, 
e.getMessage());
+    } catch (IOException e) {
+      log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag 
action from dagActionStore (check "
+          + "stacktrace) due to exception {}", flowId, e.getMessage());
+    } catch (InterruptedException e) {
+      log.warn("SpecCompiler failed to reach healthy state before compilation 
of flowId {}. Exception: ", flowId, e);
+    }
+  }
+
+  @Override
+  public void resumeFlow(ResumeDagTask resumeDagTask) throws IOException {
+
+    long triggerTimeStamp = System.currentTimeMillis();
+    String dagId = resumeDagTask.resumeDagId.toString();
+    Dag<JobExecutionPlan> dag = this.failedDagStateStore.getDag(dagId);
+    if (dag == null) {
+      log.error("Dag " + dagId + " was found in memory but not found in failed 
dag state store");
+      return;
+    }
+    resumeDagTask.initialize(dag.getNodes(), triggerTimeStamp);
+    this.taskStream.offer(resumeDagTask);
+
+  }
+
+  @Override
+  public void killFlow(KillDagTask killDagTask) {
+    long triggerTimeStamp = System.currentTimeMillis();
+    Map<String, Dag<JobExecutionPlan>> dags = 
this.dagManagementStateStore.getDags();
+    String killDagId = killDagTask.killDagId.toString();
+    if(!dags.containsKey(killDagId)) {
+      log.info("Invalid dag since not present in map. Hence cannot cancel it");
+      return;
+    }
+    Dag<JobExecutionPlan> killDag = dags.get(killDagId);
+    killDagTask.initialize(killDag.getNodes(), triggerTimeStamp);
+    this.taskStream.offer(killDagTask);
+
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will try to cancel the job when SLA is reached.
+   *
+   * @param node dag node of the job
+   * @return true if the job is killed because it reached sla
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+  @Override
+  public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> 
node) throws ExecutionException, InterruptedException {
+    long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+    long currentTime = System.currentTimeMillis();
+    String dagId = DagManagerUtils.generateDagId(node).toString();
+
+    long flowSla;
+    if (this.dagManagementStateStore.getDagToSLA().containsKey(dagId)) {
+      flowSla = this.dagManagementStateStore.getDagToSLA().get(dagId);
+    } else {
+      try {
+        flowSla = DagManagerUtils.getFlowSLA(node);

Review Comment:
   this is way too much processing here.  a task is meant to be a light-weight 
indicator of something to be done.  the corresponding dag proc then 
encapsulates how to actually do that work, by talking to the relevant state 
stores etc.
   
   at this early stage, the `DagTask` not only hasn't been turned into a 
`DagProc`, but it hasn't even been pulled from the task stream.  this code, 
comprising "how" to enforce the deadline belongs inside a `DagProc`.  ditto for 
many of the other methods of this class



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FinishResumeDagProc.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+import static org.apache.gobblin.service.ExecutionStatus.FAILED;
+import static org.apache.gobblin.service.ExecutionStatus.PENDING_RESUME;
+
+
+/**
+ * An implementation of {@link DagProc} that will process {@link DagTask} will 
PENDING_RESUME job status.
+ * This can be handled either via {@link AdvanceDagProc} or have a separate 
procedure to handle PENDING_RESUME events.
+ * Currently, I have this boilerplate code, but can decide if it makes have a 
separate procedure for completion of
+ * PENDING_RESUME events.
+ */
+@WorkInProgress
+@Slf4j
+public class FinishResumeDagProc extends DagProc {
+  private DagManagementStateStore dagManagementStateStore = new 
DagManagementStateStore();
+
+  private DagTaskStream dagTaskStream;
+  private DagStateStore dagStateStore;
+  private DagStateStore failedDagStateStore;
+
+  @Override
+  protected Object initialize() {

Review Comment:
   looks like `act` starts off by reading state.  why not perform that here, in 
`initialize()`?  the idea with separating them is to recognize which kind of 
failure should be retried?  e.g. did we fail to initialize or fail in taking 
our action.  if only the latter, then we may continue to use the initialized 
state and merely try to `act` once again



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that needs to be processed by the {@link 
DagManager}.
+ * It provides an implementation for {@link DagManagement} defines the rules 
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if 
available to {@link DagManager}
+ */
+@WorkInProgress
+@Slf4j
+public class DagTaskStream implements Iterator<Optional<DagTask>>, 
DagManagement {
+  @Getter
+  private final BlockingDeque<DagTask> taskStream = new 
LinkedBlockingDeque<>();
+  private JobStatusRetriever jobStatusRetriever;
+  private Optional<Timer> jobStatusPolledTimer;
+
+  private DagManagerMetrics dagManagerMetrics;
+
+  private DagManagementStateStore dagManagementStateStore;
+
+  private Long defaultJobStartSlaTimeMillis;
+  private FlowTriggerHandler flowTriggerHandler;
+  private Optional<DagActionStore> dagActionStore;
+  private DagStateStore failedDagStateStore;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private FlowCatalog flowCatalog = new 
FlowCatalog(ConfigBuilder.create().build());
+
+  //TODO: add ctor for instantiating the attributes (will be handled in the 
subsequent PR)
+
+  @Override
+  public boolean hasNext() {
+    return !taskStream.isEmpty();
+  }
+
+  @Override
+  public Optional<DagTask> next() {
+
+    DagTask dagTask = taskStream.peek();
+
+    try {
+      if(flowTriggerHandler.attemptDagTaskLeaseAcquisition(dagTask)) {
+        return Optional.of(taskStream.poll());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return Optional.absent();
+  }
+
+  @Override
+  public void launchFlow(LaunchDagTask launchDagTask) {
+    long triggerTimeStamp = System.currentTimeMillis();
+    FlowId flowId = new 
FlowId().setFlowGroup(launchDagTask.flowGroup).setFlowName(launchDagTask.flowName);
+    try {
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+      FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
+          
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
+      launchDagTask.initialize(optionalJobExecutionPlanDag.get().getNodes(), 
triggerTimeStamp);
+      this.taskStream.offer(launchDagTask);
+    } catch (URISyntaxException e) {
+      log.warn("Could not create URI object for flowId {} due to exception 
{}", flowId, e.getMessage());
+    } catch (SpecNotFoundException e) {
+      log.warn("Spec not found for flowId {} due to exception {}", flowId, 
e.getMessage());
+    } catch (IOException e) {
+      log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag 
action from dagActionStore (check "
+          + "stacktrace) due to exception {}", flowId, e.getMessage());
+    } catch (InterruptedException e) {
+      log.warn("SpecCompiler failed to reach healthy state before compilation 
of flowId {}. Exception: ", flowId, e);
+    }
+  }
+
+  @Override
+  public void resumeFlow(ResumeDagTask resumeDagTask) throws IOException {
+
+    long triggerTimeStamp = System.currentTimeMillis();
+    String dagId = resumeDagTask.resumeDagId.toString();
+    Dag<JobExecutionPlan> dag = this.failedDagStateStore.getDag(dagId);
+    if (dag == null) {
+      log.error("Dag " + dagId + " was found in memory but not found in failed 
dag state store");
+      return;
+    }
+    resumeDagTask.initialize(dag.getNodes(), triggerTimeStamp);
+    this.taskStream.offer(resumeDagTask);
+
+  }
+
+  @Override
+  public void killFlow(KillDagTask killDagTask) {
+    long triggerTimeStamp = System.currentTimeMillis();
+    Map<String, Dag<JobExecutionPlan>> dags = 
this.dagManagementStateStore.getDags();
+    String killDagId = killDagTask.killDagId.toString();
+    if(!dags.containsKey(killDagId)) {
+      log.info("Invalid dag since not present in map. Hence cannot cancel it");
+      return;
+    }
+    Dag<JobExecutionPlan> killDag = dags.get(killDagId);
+    killDagTask.initialize(killDag.getNodes(), triggerTimeStamp);
+    this.taskStream.offer(killDagTask);
+
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will try to cancel the job when SLA is reached.
+   *
+   * @param node dag node of the job
+   * @return true if the job is killed because it reached sla
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+  @Override
+  public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> 
node) throws ExecutionException, InterruptedException {
+    long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+    long currentTime = System.currentTimeMillis();
+    String dagId = DagManagerUtils.generateDagId(node).toString();
+
+    long flowSla;
+    if (this.dagManagementStateStore.getDagToSLA().containsKey(dagId)) {
+      flowSla = this.dagManagementStateStore.getDagToSLA().get(dagId);
+    } else {
+      try {
+        flowSla = DagManagerUtils.getFlowSLA(node);
+      } catch (ConfigException e) {
+        log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in invalid 
format, using default SLA of {}",
+            
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+            
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+            DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
+        flowSla = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+      }
+      this.dagManagementStateStore.getDagToSLA().put(dagId, flowSla);
+    }
+
+    if (currentTime > flowStartTime + flowSla) {
+      log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
 flowSla,
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+      dagManagerMetrics.incrementExecutorSlaExceeded(node);
+      KillDagProc.killDagNode(node);
+
+      
this.dagManagementStateStore.getDags().get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
+      this.dagManagementStateStore.getDags().get(dagId).setMessage("Flow 
killed due to exceeding SLA of " + flowSla + " ms");
+
+      return true;
+    }
+    return false;
+  }
+  /**
+   * Cancel the job if the job has been "orphaned". A job is orphaned if has 
been in ORCHESTRATED
+   * {@link ExecutionStatus} for some specific amount of time.
+   * @param node {@link Dag.DagNode} representing the job
+   * @param jobStatus current {@link JobStatus} of the job
+   * @return true if the total time that the job remains in the ORCHESTRATED 
state exceeds
+   * {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
+   */
+  @Override
+  public boolean enforceJobStartDeadline(Dag.DagNode<JobExecutionPlan> node, 
JobStatus jobStatus) throws ExecutionException, InterruptedException {
+    if (jobStatus == null) {
+      return false;
+    }
+    ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+    long timeOutForJobStart = DagManagerUtils.getJobStartSla(node, 
this.defaultJobStartSlaTimeMillis);
+    long jobOrchestratedTime = jobStatus.getOrchestratedTime();
+    if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - 
jobOrchestratedTime > timeOutForJobStart) {
+      log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing 
the job now...",
+          DagManagerUtils.getJobName(node),
+          DagManagerUtils.getFullyQualifiedDagName(node),
+          timeOutForJobStart);
+      dagManagerMetrics.incrementCountsStartSlaExceeded(node);
+      KillDagProc.killDagNode(node);
+
+      String dagId = DagManagerUtils.generateDagId(node).toString();
+      
dagManagementStateStore.getDags().get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
+      dagManagementStateStore.getDags().get(dagId).setMessage("Flow killed 
because no update received for " + timeOutForJobStart + " ms after 
orchestration");
+      return true;
+    } else {
+      return false;
+    }
+
+  }
+
+  /**
+   * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+   */
+  protected JobStatus pollJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName);
+  }
+
+  /**
+   * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link 
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+   */
+  protected JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag) {
+    if (dag == null || dag.isEmpty()) {
+      return null;
+    }
+    Config jobConfig = 
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, 
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY);
+  }
+  protected JobStatus pollStatus(String flowGroup, String flowName, long 
flowExecutionId, String jobGroup, String jobName) {
+    long pollStartTime = System.nanoTime();
+    Iterator<JobStatus> jobStatusIterator =
+        this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId, jobName, jobGroup);
+    Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - 
pollStartTime, TimeUnit.NANOSECONDS);
+
+    if (jobStatusIterator.hasNext()) {
+      return jobStatusIterator.next();
+    } else {
+      return null;
+    }

Review Comment:
   looks like an `Optional` would be preferable



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+
+/**
+ * Interface defining {@link DagTask} based on the type of visitor.
+ * @param <T>
+ */
+@WorkInProgress
+public interface DagTaskVisitor<T> {
+  T meet(LaunchDagTask launchDagTask)
+      throws IOException, InstantiationException, IllegalAccessException;
+  T meet(KillDagTask killDagTask) throws IOException;

Review Comment:
   I'd expected them to all have identical exception signatures...
   
   anyway, given that this is a generic (so we can presume nothing about the 
return time and hence what it's doing), the only appropriate types of 
exceptions are either to declare `throws Exception` or declare nothing and wrap 
everything internally by `RuntimeException`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/KillDagProc.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.api.client.util.Lists;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@WorkInProgress
+public final class KillDagProc extends DagProc {

Review Comment:
   once you iron out what belongs in `initialize` vs. `act`, don't forget that 
they should be strongly typed, via `DagProc<S, R>`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LaunchDagTask.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * A {@link DagTask} responsible to handle launch tasks.
+ */
+@WorkInProgress
+public class LaunchDagTask extends DagTask {
+  String flowGroup;
+  String flowName;
+
+  public LaunchDagTask(String flowGroup, String flowName) {
+    this.flowGroup = flowGroup;
+    this.flowName = flowName;
+  }
+
+  /**
+   * initializes the job properties associated with a {@link DagTask}
+   * @param dagNodes
+   * @param triggerTimeStamp
+   */
+  @Override
+  void initialize(Object dagNodes, long triggerTimeStamp) {
+    List<Dag.DagNode<JobExecutionPlan>> dagNodesToKill = 
(List<Dag.DagNode<JobExecutionPlan>>) dagNodes;

Review Comment:
   why are we involved w/ nodes to kill when launching a dag?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AdvanceDagProc.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metastore.MysqlDagStateStoreFactory;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+
+
+/**
+ * An implementation of {@link DagProc} dealing which advancing to the next 
node in the {@link Dag}.
+ * This Dag Procedure will deal with pending Job statuses such as: PENDING, 
PENDING_RESUME, PENDING_RETRY
+ * as well jobs that have reached an end state with statuses such as: 
COMPLETED, FAILED and CANCELLED.
+ * Primarily, it will be responsible for polling the flow and job statuses
+ */
+@Slf4j
+@WorkInProgress
+public class AdvanceDagProc extends DagProc {
+  private Optional<DagActionStore> dagActionStore;
+  private Optional<EventSubmitter> eventSubmitter;
+  private DagStateStore dagStateStore;
+  private MetricContext metricContext;
+  private DagManagementStateStore dagManagementStateStore;
+  private DagManagerMetrics dagManagerMetrics;
+  private MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+  private UserQuotaManager quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+      ConfigUtils.getString(ConfigBuilder.create().build(), 
ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
+      ConfigBuilder.create().build());
+
+  public AdvanceDagProc() throws IOException {
+    //TODO: add this to dagproc factory instead
+    this.dagManagementStateStore = new DagManagementStateStore();
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.multiActiveLeaseArbiter = new 
MysqlMultiActiveLeaseArbiter(ConfigBuilder.create().build());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
+
+  }
+  @Override
+  protected Object initialize() {
+    return null;
+  }
+
+  @Override
+  protected Object act(Object state) throws ExecutionException, 
InterruptedException, IOException {
+    return null;
+  }
+
+  @Override
+  protected void sendNotification(Object result) throws 
MaybeRetryableException {
+
+  }
+  private void initialize(Dag<JobExecutionPlan> dag)

Review Comment:
   what's the relationship between this private `initialize` and the protected 
one?  who should call this one?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManagerBoilerPlate.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.common.base.Optional;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@WorkInProgress
+@Slf4j
+@AllArgsConstructor
+public class NewDagManagerBoilerPlate {
+  private DagTaskStream dagTaskStream;
+  private DagProcFactory dagProcFactory;
+  private DagManager.DagManagerThread [] dagManagerThreads;
+  //TODO instantiate DMT
+
+  @WorkInProgress
+  @AllArgsConstructor
+  public static class DagManagerThread implements Runnable {
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    @Override
+    public void run() {
+      try {
+        while (dagTaskStream.hasNext()) {
+          Optional<DagTask> dagTask = getNextTask();
+          if (dagTask.isPresent()) {
+            DagProc dagProc = dagTask.get().host(dagProcFactory);
+            dagProc.process(dagTask.get().leaseAttemptStatus);
+            //TODO: Handle cleaning up of Dags
+            cleanUpDagTask();

Review Comment:
   unclear, where/why does this arise?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LaunchDagTask.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * A {@link DagTask} responsible to handle launch tasks.
+ */
+@WorkInProgress
+public class LaunchDagTask extends DagTask {
+  String flowGroup;
+  String flowName;
+
+  public LaunchDagTask(String flowGroup, String flowName) {

Review Comment:
   I thought we made an earlier change for multi-leader where we pre-assign the 
flowExecId, no?  if so, that should be among the params here



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that needs to be processed by the {@link 
DagManager}.
+ * It provides an implementation for {@link DagManagement} defines the rules 
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if 
available to {@link DagManager}
+ */
+@WorkInProgress
+@Slf4j
+public class DagTaskStream implements Iterator<Optional<DagTask>>, 
DagManagement {
+  @Getter
+  private final BlockingDeque<DagTask> taskStream = new 
LinkedBlockingDeque<>();
+  private JobStatusRetriever jobStatusRetriever;
+  private Optional<Timer> jobStatusPolledTimer;
+
+  private DagManagerMetrics dagManagerMetrics;
+
+  private DagManagementStateStore dagManagementStateStore;
+
+  private Long defaultJobStartSlaTimeMillis;
+  private FlowTriggerHandler flowTriggerHandler;
+  private Optional<DagActionStore> dagActionStore;
+  private DagStateStore failedDagStateStore;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private FlowCatalog flowCatalog = new 
FlowCatalog(ConfigBuilder.create().build());
+
+  //TODO: add ctor for instantiating the attributes (will be handled in the 
subsequent PR)
+
+  @Override
+  public boolean hasNext() {
+    return !taskStream.isEmpty();
+  }
+
+  @Override
+  public Optional<DagTask> next() {
+
+    DagTask dagTask = taskStream.peek();
+
+    try {
+      if(flowTriggerHandler.attemptDagTaskLeaseAcquisition(dagTask)) {
+        return Optional.of(taskStream.poll());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return Optional.absent();
+  }
+
+  @Override
+  public void launchFlow(LaunchDagTask launchDagTask) {
+    long triggerTimeStamp = System.currentTimeMillis();
+    FlowId flowId = new 
FlowId().setFlowGroup(launchDagTask.flowGroup).setFlowName(launchDagTask.flowName);
+    try {
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+      FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
+          
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
+      launchDagTask.initialize(optionalJobExecutionPlanDag.get().getNodes(), 
triggerTimeStamp);
+      this.taskStream.offer(launchDagTask);
+    } catch (URISyntaxException e) {
+      log.warn("Could not create URI object for flowId {} due to exception 
{}", flowId, e.getMessage());
+    } catch (SpecNotFoundException e) {
+      log.warn("Spec not found for flowId {} due to exception {}", flowId, 
e.getMessage());
+    } catch (IOException e) {
+      log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag 
action from dagActionStore (check "
+          + "stacktrace) due to exception {}", flowId, e.getMessage());
+    } catch (InterruptedException e) {
+      log.warn("SpecCompiler failed to reach healthy state before compilation 
of flowId {}. Exception: ", flowId, e);
+    }
+  }
+
+  @Override
+  public void resumeFlow(ResumeDagTask resumeDagTask) throws IOException {
+
+    long triggerTimeStamp = System.currentTimeMillis();
+    String dagId = resumeDagTask.resumeDagId.toString();
+    Dag<JobExecutionPlan> dag = this.failedDagStateStore.getDag(dagId);
+    if (dag == null) {
+      log.error("Dag " + dagId + " was found in memory but not found in failed 
dag state store");
+      return;
+    }
+    resumeDagTask.initialize(dag.getNodes(), triggerTimeStamp);
+    this.taskStream.offer(resumeDagTask);
+
+  }
+
+  @Override
+  public void killFlow(KillDagTask killDagTask) {
+    long triggerTimeStamp = System.currentTimeMillis();
+    Map<String, Dag<JobExecutionPlan>> dags = 
this.dagManagementStateStore.getDags();
+    String killDagId = killDagTask.killDagId.toString();
+    if(!dags.containsKey(killDagId)) {
+      log.info("Invalid dag since not present in map. Hence cannot cancel it");
+      return;
+    }
+    Dag<JobExecutionPlan> killDag = dags.get(killDagId);
+    killDagTask.initialize(killDag.getNodes(), triggerTimeStamp);
+    this.taskStream.offer(killDagTask);
+
+  }
+  /**
+   * Check if the SLA is configured for the flow this job belongs to.
+   * If it is, this method will try to cancel the job when SLA is reached.
+   *
+   * @param node dag node of the job
+   * @return true if the job is killed because it reached sla
+   * @throws ExecutionException exception
+   * @throws InterruptedException exception
+   */
+  @Override
+  public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> 
node) throws ExecutionException, InterruptedException {
+    long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+    long currentTime = System.currentTimeMillis();
+    String dagId = DagManagerUtils.generateDagId(node).toString();
+
+    long flowSla;
+    if (this.dagManagementStateStore.getDagToSLA().containsKey(dagId)) {
+      flowSla = this.dagManagementStateStore.getDagToSLA().get(dagId);
+    } else {
+      try {
+        flowSla = DagManagerUtils.getFlowSLA(node);
+      } catch (ConfigException e) {
+        log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in invalid 
format, using default SLA of {}",
+            
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+            
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+            DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
+        flowSla = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+      }
+      this.dagManagementStateStore.getDagToSLA().put(dagId, flowSla);
+    }
+
+    if (currentTime > flowStartTime + flowSla) {
+      log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
 flowSla,
+          
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+      dagManagerMetrics.incrementExecutorSlaExceeded(node);
+      KillDagProc.killDagNode(node);
+
+      
this.dagManagementStateStore.getDags().get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
+      this.dagManagementStateStore.getDags().get(dagId).setMessage("Flow 
killed due to exceeding SLA of " + flowSla + " ms");
+
+      return true;
+    }
+    return false;
+  }
+  /**
+   * Cancel the job if the job has been "orphaned". A job is orphaned if has 
been in ORCHESTRATED
+   * {@link ExecutionStatus} for some specific amount of time.
+   * @param node {@link Dag.DagNode} representing the job
+   * @param jobStatus current {@link JobStatus} of the job
+   * @return true if the total time that the job remains in the ORCHESTRATED 
state exceeds
+   * {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
+   */
+  @Override
+  public boolean enforceJobStartDeadline(Dag.DagNode<JobExecutionPlan> node, 
JobStatus jobStatus) throws ExecutionException, InterruptedException {
+    if (jobStatus == null) {
+      return false;
+    }
+    ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+    long timeOutForJobStart = DagManagerUtils.getJobStartSla(node, 
this.defaultJobStartSlaTimeMillis);
+    long jobOrchestratedTime = jobStatus.getOrchestratedTime();
+    if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - 
jobOrchestratedTime > timeOutForJobStart) {
+      log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing 
the job now...",
+          DagManagerUtils.getJobName(node),
+          DagManagerUtils.getFullyQualifiedDagName(node),
+          timeOutForJobStart);
+      dagManagerMetrics.incrementCountsStartSlaExceeded(node);
+      KillDagProc.killDagNode(node);
+
+      String dagId = DagManagerUtils.generateDagId(node).toString();
+      
dagManagementStateStore.getDags().get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
+      dagManagementStateStore.getDags().get(dagId).setMessage("Flow killed 
because no update received for " + timeOutForJobStart + " ms after 
orchestration");
+      return true;
+    } else {
+      return false;
+    }
+
+  }
+
+  /**
+   * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+   */
+  protected JobStatus pollJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName);
+  }
+
+  /**
+   * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link 
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+   */
+  protected JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag) {
+    if (dag == null || dag.isEmpty()) {
+      return null;
+    }
+    Config jobConfig = 
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, 
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY);
+  }
+  protected JobStatus pollStatus(String flowGroup, String flowName, long 
flowExecutionId, String jobGroup, String jobName) {

Review Comment:
   naming-wise, you're not acutally polling anything are you?  seems more like 
a one-time `retrieveJobStatus` or `getJobStatus` or `loadJobStatus`, no?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -136,14 +148,26 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
+
       if (operation.equals("INSERT")) {
         if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
           log.info("Received insert dag action and about to send resume flow 
request");
           dagManager.handleResumeFlowRequest(flowGroup, 
flowName,Long.parseLong(flowExecutionId));
+          //TODO: add a flag for if condition only if multi-active is enabled
+          if(isRefactoredDagManagerEnabled) {
+            ResumeDagTask resumeDagTask = new ResumeDagTask(new 
DagManager.DagId(flowGroup, flowName, flowExecutionId));
+            dagTaskStream.resumeFlow(resumeDagTask);
+          }

Review Comment:
   maybe interpret the flag earlier and turn this into an `Optional` to 
simplify it with `.ifPresent` (which is part of java, but not guava `Optional`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to