[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=904486&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-904486
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Feb/24 00:40
            Start Date: 10/Feb/24 00:40
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3858:
URL: https://github.com/apache/gobblin/pull/3858#discussion_r1484848855


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId);
+  Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);

Review Comment:
   naming confuses me.  is this equivalent to `getParentDag`?  if so, shouldn't 
that be a method on `Dag.DagNode`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId);
+  Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);

Review Comment:
   add javadoc to clarify if return value is same as param passed in or was the 
previous value that might now be replaced...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId);
+  Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;
+  boolean addFailedDag(String dagId);

Review Comment:
   nit: `markDagFailed`?  it doesn't add a `Dag`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId);
+  Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;
+  boolean addFailedDag(String dagId);
+  boolean existsFailedDag(String dagId);
+  boolean addCleanUpDag(String dagId);
+  boolean checkCleanUpDag(String dagId);

Review Comment:
   if `existsFailedDag` and `checkCleanUpDag` are parallels, let's align the 
naming



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId);
+  Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;
+  boolean addFailedDag(String dagId);
+  boolean existsFailedDag(String dagId);
+  boolean addCleanUpDag(String dagId);
+  boolean checkCleanUpDag(String dagId);
+  void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void removeDagActionFromStore(DagActionStore.DagAction dagAction) throws 
IOException;
+  Map<String, Long> getDagToSLA();

Review Comment:
   1. we have two "SLAs"... which is this?
   2. they aren't truly SLAs, but deadlines, so suggest changing naming (even 
if we retain dual config for compatibility (e.g. `flow.start.deadline`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+public interface DagTaskVisitor<D extends DagProc> {
+  LaunchDagProc meet(LaunchDagTask launchDagTask);

Review Comment:
   let's not hard-code the type mapping between each DagTask and DagProc. in 
fact, let's not even hard-code that each `DagTask` must turn into a `DagProc`.
   
   this is the reason for having a generic param: so the parameterizable 
generic type is returned by every `meet` method.  to be fully general, the type 
param should stand-alone and not `extends DagProc`.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link 
org.apache.gobblin.service.modules.orchestration.task.DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Dag<JobExecutionPlan>, Void> {
+  private final LaunchDagTask launchDagTask;
+  NewDagManager newDagManager;
+
+  public LaunchDagProc(LaunchDagTask launchDagTask, NewDagManager 
newDagManager) {
+    this.launchDagTask = launchDagTask;
+    this.newDagManager = newDagManager;
+  }
+
+  @Override
+  protected Dag<JobExecutionPlan> initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException {
+    return 
dagManagementStateStore.getDag(this.launchDagTask.getDagId().toString());
+  }
+
+  @Override
+  protected Void act(DagManagementStateStore dagManagementStateStore, 
Dag<JobExecutionPlan> dag) throws IOException {
+    if (dag == null) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return null;
+    }
+    initializeDag(dag, dagManagementStateStore);
+    DagManagerUtils.submitPendingExecStatus(dag, this.eventSubmitter);
+    return null;
+  }
+
+  protected void initializeDag(Dag<JobExecutionPlan> dag, 
DagManagementStateStore dagManagementStateStore)
+      throws IOException {
+    //Add Dag to the map of running dags
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
+    log.info("Initializing Dag {}", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+
+    //A flag to indicate if the flow is already running.
+    boolean isDagRunning = false;
+    //Are there any jobs already in the running state? This check is for Dags 
already running
+    //before a leadership change occurs.
+    for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+      if (DagManagerUtils.getExecutionStatus(dagNode) == 
ExecutionStatus.RUNNING) {
+        dagManagementStateStore.addJobState(dagId, dagNode);
+        //Update the running jobs counter.
+        
NewDagManager.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
+        isDagRunning = true;
+      }
+    }
+
+    FlowId flowId = DagManagerUtils.getFlowId(dag);
+    NewDagManager.getDagManagerMetrics().registerFlowMetric(flowId, dag);
+
+    log.debug("Dag {} submitting jobs ready for execution.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+    //Determine the next set of jobs to run and submit them for execution
+    Map<String, Set<Dag.DagNode<JobExecutionPlan>>> nextSubmitted = 
submitNext(dagManagementStateStore, dagId);
+    for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted.get(dagId)) {
+      dagManagementStateStore.addJobState(dagId, dagNode);
+    }
+
+    // Set flow status to running
+    DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, 
TimingEvent.FlowTimings.FLOW_RUNNING);

Review Comment:
   I originally though `act` might set things up`, but `commit` would actually 
record them and therefore send out GTEs of this sort... we should discuss...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/event/JobStatusEvent.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring.event;
+
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+
+/**
+ * An object that {@link 
org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor} emits when it 
receives a final
+ * job status GTE.
+ */
+@Getter
+public class JobStatusEvent {
+  State jobStatusState;
+  String flowGroup;
+  String flowName;
+  long flowExecutionId;
+  String jobGroup;
+  String jobName;
+  ExecutionStatus status;
+  JobStatus jobStatus;

Review Comment:
   what are the pros/cons of having the KJSM send these along vs. having the 
recipient look them up fresh from the `DMStateStore`?  I would think the latter 
approach would guard against out-of-date info...
   
   (clearly, if you removed these, you'd need to rename this class... 
`UpdatedJobEvent`, since you'd only send info about the job that was updated, 
not specifically about what was updated...)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, T> {
+  protected final EventSubmitter eventSubmitter;
+  protected final AtomicLong orchestrationDelay;

Review Comment:
   do all `DagProc`s need this or only some?  ...also, not completely clear 
what it is, so javadoc would help 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends DagProc<KillDagProc.CancelEntity, 
Optional<Dag<JobExecutionPlan>>> {
+
+  // should dag task be a part of dag proc?
+  private final KillDagTask killDagTask;
+
+  public KillDagProc(KillDagTask killDagTask) {
+    this.killDagTask = killDagTask;
+  }
+
+  protected CancelEntity initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException {
+    Dag<JobExecutionPlan> dagToCancel = 
dagManagementStateStore.getDag(this.killDagTask.getDagId().toString());
+    List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = new 
ArrayList<>(dagManagementStateStore.getJobs(this.killDagTask.getDagId().toString()));
+    return new CancelEntity(dagToCancel, dagNodesToCancel);
+  }
+
+  @Override
+  public Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, CancelEntity cancelEntity) throws IOException {
+    if (cancelEntity.dagToCancel == null || 
cancelEntity.dagNodesToCancel.isEmpty()) {
+      log.warn("No dag with id " + this.killDagTask.getDagId() + " found to 
kill");
+      return Optional.empty();
+    }
+    for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : 
cancelEntity.dagNodesToCancel) {
+      cancelDagNode(cancelEntity.dagToCancel, dagNodeToCancel, 
dagManagementStateStore);
+    }
+    return Optional.of(cancelEntity.dagToCancel);
+  }
+
+  private void cancelDagNode(Dag<JobExecutionPlan> dagToCancel, 
Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore 
dagManagementStateStore) throws IOException {
+    Properties props = new Properties();
+    try {
+      if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+        Future future = dagNodeToCancel.getValue().getJobFuture().get();
+        String serializedFuture = 
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+        props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, 
serializedFuture);
+        sendCancellationEvent(dagNodeToCancel.getValue());
+      }
+      if 
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
+        props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+            
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+      }
+      
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 props);
+      dagToCancel.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+      dagToCancel.setMessage("Flow killed by request");
+      
dagManagementStateStore.removeDagActionFromStore(this.killDagTask.getDagAction());
+    } catch (ExecutionException | InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+    Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+    
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+    jobExecutionPlan.setExecutionStatus(CANCELLED);
+  }
+
+  @AllArgsConstructor

Review Comment:
   could all the members be `final`?
   
   why not encapsulate behavior here such as:
   ```
       for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : 
cancelEntity.dagNodesToCancel) {
         cancelDagNode(cancelEntity.dagToCancel, dagNodeToCancel, 
dagManagementStateStore);
       }
   ```
   as
   ```
   /** @returns how many canceled; @throws ...??? */
   public int cancelAllDagNodes(DMStateStore stateStore);
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given 
{@link DagTask}.
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcFactory implements DagTaskVisitor {
+  @Inject private NewDagManager newDagManager;
+
+  @Override
+  public LaunchDagProc meet(LaunchDagTask launchDagTask) {
+    return new LaunchDagProc(launchDagTask, this.newDagManager);

Review Comment:
   rather than constructing the proc w/ the DM, let's have the `DagProcEngine` 
(or equiv) provide it during the `DagProc.process` method invocation



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId);
+  Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;
+  boolean addFailedDag(String dagId);
+  boolean existsFailedDag(String dagId);
+  boolean addCleanUpDag(String dagId);
+  boolean checkCleanUpDag(String dagId);
+  void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);

Review Comment:
   `addDagNodeState`?  `addDagNodeToDag`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId);
+  Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;

Review Comment:
   I realize a DagNode is roughly a Job, but for clarity, I'd still recommend 
`getDagNodes` and `getEveryDagNode`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given 
{@link DagTask}.

Review Comment:
   no real need to mention this is a visitor (it's kind of an impl detail), but 
if you want to I suggest:
   > {@link DagTaskVisitor} for transforming a specific {@link DagTask} derived 
class to its companion {@link DagProc} derived class
   
   OR alt.
   > for mapping from a specific type of {@link DagTask} to the {@link DagProc} 
class hierarchy



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends DagProc<KillDagProc.CancelEntity, 
Optional<Dag<JobExecutionPlan>>> {
+
+  // should dag task be a part of dag proc?
+  private final KillDagTask killDagTask;
+
+  public KillDagProc(KillDagTask killDagTask) {
+    this.killDagTask = killDagTask;
+  }

Review Comment:
   the `DagTask`s are to be pojos w/ no "behavior", so it shouldn't be "used" 
within, but if we want to access fields of it that's OK.
   
   personally, I'd probably define members of this class, then do `KillDagTask` 
field access in the `DagProcFactory` and finally pass those values into this 
ctor.  I believe unit tests would be clearest if we merely constructed 
`KillDagProc` w/ a `dagId`, rather than needing to first create a `KillDagTask`.
   
   nonetheless, if you really want to just retain the singular `KillDagTask`, 
and access it within, that not the end of the world.
   



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends DagProc<KillDagProc.CancelEntity, 
Optional<Dag<JobExecutionPlan>>> {
+
+  // should dag task be a part of dag proc?
+  private final KillDagTask killDagTask;
+
+  public KillDagProc(KillDagTask killDagTask) {
+    this.killDagTask = killDagTask;
+  }
+
+  protected CancelEntity initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException {
+    Dag<JobExecutionPlan> dagToCancel = 
dagManagementStateStore.getDag(this.killDagTask.getDagId().toString());
+    List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = new 
ArrayList<>(dagManagementStateStore.getJobs(this.killDagTask.getDagId().toString()));
+    return new CancelEntity(dagToCancel, dagNodesToCancel);
+  }
+
+  @Override
+  public Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore 
dagManagementStateStore, CancelEntity cancelEntity) throws IOException {
+    if (cancelEntity.dagToCancel == null || 
cancelEntity.dagNodesToCancel.isEmpty()) {
+      log.warn("No dag with id " + this.killDagTask.getDagId() + " found to 
kill");
+      return Optional.empty();

Review Comment:
   why not return `Optional` from `initialize` and put this check over there?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link 
org.apache.gobblin.service.modules.orchestration.task.DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Dag<JobExecutionPlan>, Void> {
+  private final LaunchDagTask launchDagTask;
+  NewDagManager newDagManager;
+
+  public LaunchDagProc(LaunchDagTask launchDagTask, NewDagManager 
newDagManager) {
+    this.launchDagTask = launchDagTask;
+    this.newDagManager = newDagManager;
+  }
+
+  @Override
+  protected Dag<JobExecutionPlan> initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException {
+    return 
dagManagementStateStore.getDag(this.launchDagTask.getDagId().toString());
+  }
+
+  @Override
+  protected Void act(DagManagementStateStore dagManagementStateStore, 
Dag<JobExecutionPlan> dag) throws IOException {
+    if (dag == null) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return null;
+    }

Review Comment:
   prefer in `initialize`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+  Dag<JobExecutionPlan> getDag(String dagId);
+  Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+  boolean containsDag(String dagId);
+  Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+  Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
+  List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;
+  boolean addFailedDag(String dagId);
+  boolean existsFailedDag(String dagId);
+  boolean addCleanUpDag(String dagId);
+  boolean checkCleanUpDag(String dagId);
+  void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void removeDagActionFromStore(DagActionStore.DagAction dagAction) throws 
IOException;

Review Comment:
   `releaseDagAction` / `deleteDagAction` / `removeDagAction`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends DagProc<KillDagProc.CancelEntity, 
Optional<Dag<JobExecutionPlan>>> {
+
+  // should dag task be a part of dag proc?
+  private final KillDagTask killDagTask;
+
+  public KillDagProc(KillDagTask killDagTask) {
+    this.killDagTask = killDagTask;
+  }
+
+  protected CancelEntity initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException {
+    Dag<JobExecutionPlan> dagToCancel = 
dagManagementStateStore.getDag(this.killDagTask.getDagId().toString());
+    List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = new 
ArrayList<>(dagManagementStateStore.getJobs(this.killDagTask.getDagId().toString()));

Review Comment:
   why construct the `ArrayList`?... I thought the state store already returned 
one



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link 
org.apache.gobblin.service.modules.orchestration.task.DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Dag<JobExecutionPlan>, Void> {
+  private final LaunchDagTask launchDagTask;
+  NewDagManager newDagManager;
+
+  public LaunchDagProc(LaunchDagTask launchDagTask, NewDagManager 
newDagManager) {
+    this.launchDagTask = launchDagTask;
+    this.newDagManager = newDagManager;
+  }
+
+  @Override
+  protected Dag<JobExecutionPlan> initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException {
+    return 
dagManagementStateStore.getDag(this.launchDagTask.getDagId().toString());
+  }
+
+  @Override
+  protected Void act(DagManagementStateStore dagManagementStateStore, 
Dag<JobExecutionPlan> dag) throws IOException {
+    if (dag == null) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return null;
+    }
+    initializeDag(dag, dagManagementStateStore);
+    DagManagerUtils.submitPendingExecStatus(dag, this.eventSubmitter);
+    return null;
+  }
+
+  protected void initializeDag(Dag<JobExecutionPlan> dag, 
DagManagementStateStore dagManagementStateStore)
+      throws IOException {
+    //Add Dag to the map of running dags
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
+    log.info("Initializing Dag {}", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+
+    //A flag to indicate if the flow is already running.
+    boolean isDagRunning = false;
+    //Are there any jobs already in the running state? This check is for Dags 
already running
+    //before a leadership change occurs.
+    for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+      if (DagManagerUtils.getExecutionStatus(dagNode) == 
ExecutionStatus.RUNNING) {
+        dagManagementStateStore.addJobState(dagId, dagNode);
+        //Update the running jobs counter.
+        
NewDagManager.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
+        isDagRunning = true;
+      }
+    }
+
+    FlowId flowId = DagManagerUtils.getFlowId(dag);
+    NewDagManager.getDagManagerMetrics().registerFlowMetric(flowId, dag);
+
+    log.debug("Dag {} submitting jobs ready for execution.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+    //Determine the next set of jobs to run and submit them for execution
+    Map<String, Set<Dag.DagNode<JobExecutionPlan>>> nextSubmitted = 
submitNext(dagManagementStateStore, dagId);
+    for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted.get(dagId)) {
+      dagManagementStateStore.addJobState(dagId, dagNode);
+    }
+
+    // Set flow status to running
+    DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, 
TimingEvent.FlowTimings.FLOW_RUNNING);
+    NewDagManager.getDagManagerMetrics().conditionallyMarkFlowAsState(flowId, 
DagManager.FlowState.RUNNING);
+
+    // Report the orchestration delay the first time the Dag is initialized. 
Orchestration delay is defined as
+    // the time difference between the instant when a flow first transitions 
to the running state and the instant
+    // when the flow is submitted to Gobblin service.
+    if (!isDagRunning) {
+      this.orchestrationDelay.set(System.currentTimeMillis() - 
DagManagerUtils.getFlowExecId(dag));
+    }
+
+    log.info("Dag {} Initialization complete.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+  }
+
+  /**
+   * Submit next set of Dag nodes in the Dag identified by the provided dagId
+   * @param dagId The dagId that should be processed.
+   * @return
+   * @throws IOException
+   */
+  // todo - convert to return set only
+  synchronized Map<String, Set<Dag.DagNode<JobExecutionPlan>>> 
submitNext(DagManagementStateStore dagManagementStateStore,
+      String dagId) throws IOException {
+    Dag<JobExecutionPlan> dag = dagManagementStateStore.getDag(dagId);
+    Set<Dag.DagNode<JobExecutionPlan>> nextNodes = 
DagManagerUtils.getNext(dag);
+    List<String> nextJobNames = new ArrayList<>();
+
+    //Submit jobs from the dag ready for execution.
+    for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+      submitJob(dagManagementStateStore, dagNode);
+      nextJobNames.add(DagManagerUtils.getJobName(dagNode));
+    }
+
+    log.info("Submitting next nodes for dagId {}, where next jobs to be 
submitted are {}", dagId, nextJobNames);
+    //Checkpoint the dag state
+    newDagManager.getDagStateStore().writeCheckpoint(dag);
+
+    Map<String, Set<Dag.DagNode<JobExecutionPlan>>> dagIdToNextJobs = 
Maps.newHashMap();
+    dagIdToNextJobs.put(dagId, nextNodes);
+    return dagIdToNextJobs;
+  }
+
+  /**
+   * Submits a {@link JobSpec} to a {@link 
org.apache.gobblin.runtime.api.SpecExecutor}.
+   */
+  private void submitJob(DagManagementStateStore dagManagementStateStore, 
Dag.DagNode<JobExecutionPlan> dagNode) {
+    DagManagerUtils.incrementJobAttempt(dagNode);
+    JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+    jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+    JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
+    Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
+    String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+    // Run this spec on selected executor
+    SpecProducer<Spec> producer;
+    try {
+      
this.newDagManager.getUserQuotaManager().checkQuota(Collections.singleton(dagNode));
+      producer = DagManagerUtils.getSpecProducer(dagNode);
+      TimingEvent jobOrchestrationTimer = 
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
+
+      // Increment job count before submitting the job onto the spec producer, 
in case that throws an exception.
+      // By this point the quota is allocated, so it's imperative to increment 
as missing would introduce the potential to decrement below zero upon quota 
release.
+      // Quota release is guaranteed, despite failure, because exception 
handling within would mark the job FAILED.
+      // When the ensuing kafka message spurs DagManager processing, the quota 
is released and the counts decremented
+      // Ensure that we do not double increment for flows that are retried
+      if (dagNode.getValue().getCurrentAttempts() == 1) {
+        
NewDagManager.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
+      }
+      // Submit the job to the SpecProducer, which in turn performs the actual 
job submission to the SpecExecutor instance.
+      // The SpecProducer implementations submit the job to the underlying 
executor and return when the submission is complete,
+      // either successfully or unsuccessfully. To catch any exceptions in the 
job submission, the DagManagerThread
+      // blocks (by calling Future#get()) until the submission is completed.
+      Future<?> addSpecFuture = producer.addSpec(jobSpec);
+      dagNode.getValue().setJobFuture(Optional.of(addSpecFuture));
+      //Persist the dag
+      
newDagManager.getDagStateStore().writeCheckpoint(dagManagementStateStore.getDag(DagManagerUtils.generateDagId(dagNode).toString()));

Review Comment:
   again, why not keep so much state floating around, rather than consolidate 
it into a singular `facade` we can easily replace--all-or-nothing?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * A {@link DagTask} responsible to handle kill tasks.
+ */
+@Alpha
+public class KillDagTask extends DagTask {
+  public KillDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseObtainedStatus) {
+    super(dagAction, leaseObtainedStatus);
+  }

Review Comment:
   isn't there a lombok annotation that will synthesize this plus call super as 
well?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link 
org.apache.gobblin.service.modules.orchestration.task.DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Dag<JobExecutionPlan>, Void> {
+  private final LaunchDagTask launchDagTask;
+  NewDagManager newDagManager;
+
+  public LaunchDagProc(LaunchDagTask launchDagTask, NewDagManager 
newDagManager) {
+    this.launchDagTask = launchDagTask;
+    this.newDagManager = newDagManager;
+  }
+
+  @Override
+  protected Dag<JobExecutionPlan> initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException {
+    return 
dagManagementStateStore.getDag(this.launchDagTask.getDagId().toString());
+  }
+
+  @Override
+  protected Void act(DagManagementStateStore dagManagementStateStore, 
Dag<JobExecutionPlan> dag) throws IOException {
+    if (dag == null) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return null;
+    }
+    initializeDag(dag, dagManagementStateStore);
+    DagManagerUtils.submitPendingExecStatus(dag, this.eventSubmitter);
+    return null;
+  }
+
+  protected void initializeDag(Dag<JobExecutionPlan> dag, 
DagManagementStateStore dagManagementStateStore)
+      throws IOException {
+    //Add Dag to the map of running dags
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
+    log.info("Initializing Dag {}", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+
+    //A flag to indicate if the flow is already running.
+    boolean isDagRunning = false;
+    //Are there any jobs already in the running state? This check is for Dags 
already running
+    //before a leadership change occurs.
+    for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+      if (DagManagerUtils.getExecutionStatus(dagNode) == 
ExecutionStatus.RUNNING) {
+        dagManagementStateStore.addJobState(dagId, dagNode);
+        //Update the running jobs counter.
+        
NewDagManager.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);

Review Comment:
   couldn't these *state-ful* metrics live encapsulated within the 
`DagManagementStateStore`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link 
org.apache.gobblin.service.modules.orchestration.task.DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Dag<JobExecutionPlan>, Void> {
+  private final LaunchDagTask launchDagTask;
+  NewDagManager newDagManager;
+
+  public LaunchDagProc(LaunchDagTask launchDagTask, NewDagManager 
newDagManager) {
+    this.launchDagTask = launchDagTask;
+    this.newDagManager = newDagManager;
+  }
+
+  @Override
+  protected Dag<JobExecutionPlan> initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException {
+    return 
dagManagementStateStore.getDag(this.launchDagTask.getDagId().toString());

Review Comment:
   again, it would look much clearer to have the ctor take just `dagId`, rather 
than a specific `DagTask` type



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task in a Dag.
+ * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it 
will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} 
as complete
+ */
+
+@Alpha
+public abstract class DagTask {
+  @Getter public DagActionStore.DagAction dagAction;
+  private MultiActiveLeaseArbiter.LeaseAttemptStatus leaseObtainedStatus;
+  @Getter DagManager.DagId dagId;
+
+  public DagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseObtainedStatus) {
+    this.dagAction = dagAction;
+    this.leaseObtainedStatus = leaseObtainedStatus;
+    this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+  }
+
+  public abstract DagProc host(DagTaskVisitor<DagProc> visitor);

Review Comment:
   this should be:
   ```
   <T>
   public abstract T host(DagTaskVisitor<T> visitor);
   ```
   



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task in a Dag.
+ * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it 
will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} 
as complete
+ */
+
+@Alpha
+public abstract class DagTask {
+  @Getter public DagActionStore.DagAction dagAction;

Review Comment:
   I'm not sure who should be accessing this... who do you have in mind?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Singleton;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+/**
+ * Holds a stream of {@link DagTask}s that {@link DagProcessingEngine} would 
pull from for processing.
+ * Implements {@link Iterator} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagTaskStream implements Iterator<DagTask>{
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+
+  @Override
+  public boolean hasNext() {
+    return !this.dagActionQueue.isEmpty();
+  }
+
+  @Override
+  public DagTask next() {
+    try {
+      DagActionStore.DagAction dagAction = this.dagActionQueue.take();  
//`take` blocks till element is not available
+      // todo reconsider the use of MultiActiveLeaseArbiter
+      //MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction);
+      // todo - uncomment after flow trigger handler provides such an api
+      //Properties jobProps = getJobProperties(dagAction);
+      //flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+      //if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+        // can it return null? is this iterator allowed to return null?
+        return createDagTask(dagAction, new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, 
System.currentTimeMillis()));

Review Comment:
   I understand punting on the actual lease arbitration, but for clarity, I'd 
suggest modeling that not all leases are necessarily obtained.  e.g. w/ a 
private method `acquireLease` that returns either the lease status base type or 
an `Optional<LeaseObtainedStatus>`.  then have this method `createDagTask` only 
if the lease was obtained.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 904486)
    Time Spent: 17h 50m  (was: 17h 40m)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 17h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to