phet commented on code in PR #3825:
URL: https://github.com/apache/gobblin/pull/3825#discussion_r1431275742


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java:
##########
@@ -775,7 +776,7 @@ class MockKafkaAvroJobStatusMonitor extends 
KafkaAvroJobStatusMonitor {
     public MockKafkaAvroJobStatusMonitor(String topic, Config config, int 
numThreads,
         AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, 
GaaSObservabilityEventProducer producer)
         throws IOException, ReflectiveOperationException {
-      super(topic, config, numThreads, mock(JobIssueEventHandler.class), 
producer);
+      super(topic, config, numThreads, mock(JobIssueEventHandler.class), 
producer, mock(DagProcessingEngine.class));

Review Comment:
   does this code compile?  I don't see a change to 
`KafkaAvroJobStatusMonitor`...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -212,10 +223,16 @@ private void handleDagAction(DagActionStore.DagAction 
dagAction, boolean isStart
     if 
(dagAction.getFlowActionType().equals(DagActionStore.FlowActionType.RESUME)) {
       dagManager.handleResumeFlowRequest(dagAction.getFlowGroup(), 
dagAction.getFlowName(),
           Long.parseLong(dagAction.getFlowExecutionId()));
+      if (isMultiLeaderDagManagerEnabled) {
+        this.dagProcessingEngine.addDagAction(dagAction);
+      }

Review Comment:
   the Null Pattern is cleaner than every time checking a separate boolean.  
i.e. define an interface, where one impl has a `addDagAction` that actually 
does real processing, and another "no-op" impl where `addDagAction` does 
nothing.  then instantiate the NoOp if `isMultiActive == false`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+  public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "pollingInterval";
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+
+  private final DagTaskStream dagTaskStream;
+  Optional<EventSubmitter> eventSubmitter;
+
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory, MultiActiveLeaseArbiter multiActiveLeaseArbiter) 
{
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+    Integer numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    ScheduledExecutorService scheduledExecutorPool = 
Executors.newScheduledThreadPool(numThreads);
+    Integer pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.dagTaskStream = dagTaskStream;
+
+    for (int i=0; i < numThreads; i++) {
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagTaskStream, dagProcFactory);
+      scheduledExecutorPool.scheduleAtFixedRate(dagProcEngineThread, 0, 
pollingInterval, TimeUnit.SECONDS);

Review Comment:
   why schedule to run periodically?  can't `DagTaskStream::next` be a blocking 
call and the loop an infinite one (perhaps interruptible)?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/AdvanceDagTask.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * An implementation of {@link DagTask} that is responsible for advancing the 
dag to the next node based
+ * on its current flow and job status. It is added to the {@link 
org.apache.gobblin.service.modules.orchestration.DagTaskStream}
+ * by the {@link org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor} 
after it consumes the appropriate
+ * {@link org.apache.gobblin.metrics.GobblinTrackingEvent} for the {@link 
org.apache.gobblin.service.modules.flowgraph.Dag}
+ */
+
+@Alpha
+@Slf4j
+public class AdvanceDagTask extends DagTask {
+  //todo - move it to super
+  @Getter DagManager.DagId advanceDagId;
+
+//  @Getter
+//  private final Dag.DagNode<JobExecutionPlan> dagNode;
+
+  public AdvanceDagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseObtainedStatus) {
+    super(dagAction, leaseObtainedStatus);
+    this.advanceDagId = 
DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+  }
+
+  @Override
+  public DagProc host(DagTaskVisitor<DagProc> visitor) {
+    return visitor.meet(this);
+  }

Review Comment:
   doesn't this work:
   ```
   <T extends DagProc>
   public T host(DTV<T> visitor) {
     return visitor.meet(this);
   ```
   ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+
+
+public interface DagManagement {
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap);
+  void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  /**
+   * defines what to do when a job (dag node) finishes
+   * @param dagNode dag node that finished
+   * @return next set of DagNodes to run
+   * @throws IOException
+   */
+  Map<String, Set<Dag.DagNode<JobExecutionPlan>>> 
onJobFinish(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
+  void removeDagActionFromStore(DagActionStore.DagAction dagAction) throws 
IOException;
+  void handleJobStatusEvent(JobStatusEvent jobStatusEvent);
+  void handleKillFlowEvent(KillFlowEvent killFlowEvent);
+  void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) throws 
IOException;
+  Dag<JobExecutionPlan> getDag(String dagId);
+  boolean containsDag(String dagId);
+  Map<String, Dag<JobExecutionPlan>> getResumingDags();
+  List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+  List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;
+  Map<String, Long> getDagToSLA();
+  Set<String> getFailedDagIds();
+  DagStateStore getFailedDagStateStore();
+  DagStateStore getDagStateStore();
+  void setActive() throws IOException;
+  void addDag(String dagId, Dag<JobExecutionPlan> dag);

Review Comment:
   all of these--don't they belong in the `DagManagementStateStore`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.

Review Comment:
   this javadoc describes how I've presumed the class would work... but the 
impl itself looks different (e.g. `addNewDag` methods invoking a method on the 
`DagTaskStream`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+
+
+public interface DagManagement {

Review Comment:
   am I having trouble w/ the diff?  ...I don't see any class implementing this



##########
gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java:
##########
@@ -41,6 +41,7 @@ public class ServiceConfigKeys {
   public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = 
false;
   public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
   public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
+  public static final String 
GOBBLIN_SERVICE_MULTI_ACTIVE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + 
"multiActiveDagManager.enabled";

Review Comment:
   nit: mult-active dag processing



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -94,6 +100,8 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
   @Getter
   private final StateStore<org.apache.gobblin.configuration.State> stateStore;
   private final ScheduledExecutorService scheduledExecutorService;
+  @Getter(AccessLevel.PUBLIC)

Review Comment:
   I'm not sure whether I'm having trouble w/ the diff... but I can't find 
anyone accessing this.
   
   correspondingly, I can't tell the intention behind it.  that really deserves 
comments to document plus a name to indicate the purpose of this `EventBus` 
(and perhaps also the type of its payload), as opposed to merely repeating its 
type name



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+  public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "pollingInterval";
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+
+  private final DagTaskStream dagTaskStream;
+  Optional<EventSubmitter> eventSubmitter;
+
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory, MultiActiveLeaseArbiter multiActiveLeaseArbiter) 
{
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+    Integer numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    ScheduledExecutorService scheduledExecutorPool = 
Executors.newScheduledThreadPool(numThreads);
+    Integer pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.dagTaskStream = dagTaskStream;
+
+    for (int i=0; i < numThreads; i++) {
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagTaskStream, dagProcFactory);
+      scheduledExecutorPool.scheduleAtFixedRate(dagProcEngineThread, 0, 
pollingInterval, TimeUnit.SECONDS);
+    }
+  }
+
+  public void addNewDag(Dag<JobExecutionPlan> dag) {
+    this.dagTaskStream.addDagAction(DagManagerUtils.createDagAction(dag, 
DagActionStore.FlowActionType.LAUNCH));

Review Comment:
   the DPE should be pulling `DagTask`s from the stream (using `next()`, since 
it's an `Iterator`).  AFAICT, is should be making no other calls to the DTS



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.CleanUpDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.ReloadDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.ResumeDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.RetryDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ReloadDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.RetryDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given 
{@link DagTask}.
+ */
+
+@Alpha
+@Slf4j
+public class DagProcFactory implements DagTaskVisitor {
+  public NewDagManager dagManager;
+  public final DagProcessingEngine dagProcessingEngine;

Review Comment:
   the proc engine uses this proc factory to transform `DagTask`s to 
`DagProc`s.  holding a ref to the DPE here would set up a circular dependency.
   
   instead, the factory should function totally independently.  when the 
`DagProcessingEngine` invokes the `DagProc`s returned by the factory, it will 
pass on to them its `DagManagementStateStore`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -110,6 +118,9 @@ public DagActionStoreChangeMonitor(String topic, Config 
config, DagManager dagMa
     this.orchestrator = orchestrator;
     this.dagActionStore = dagActionStore;
     this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+    // instantiating using default ctor; subsequent PR will handle 
instantiating with multi-args ctor

Review Comment:
   nit: flag w/ `TODO` as that's understood by IDEs and highlighted



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+
+
+public interface DagManagement {
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap);

Review Comment:
   this method is unlike the others, which essentially take action upon a 
single dag.  intuitively we should justify the departure from that format.  if 
no clear rationale, we should locate the method elsewhere, to keep this 
interface cohesive



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+  public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "pollingInterval";
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+
+  private final DagTaskStream dagTaskStream;
+  Optional<EventSubmitter> eventSubmitter;
+
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory, MultiActiveLeaseArbiter multiActiveLeaseArbiter) 
{
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+    Integer numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    ScheduledExecutorService scheduledExecutorPool = 
Executors.newScheduledThreadPool(numThreads);
+    Integer pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.dagTaskStream = dagTaskStream;
+
+    for (int i=0; i < numThreads; i++) {
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagTaskStream, dagProcFactory);
+      scheduledExecutorPool.scheduleAtFixedRate(dagProcEngineThread, 0, 
pollingInterval, TimeUnit.SECONDS);
+    }
+  }
+
+  public void addNewDag(Dag<JobExecutionPlan> dag) {
+    this.dagTaskStream.addDagAction(DagManagerUtils.createDagAction(dag, 
DagActionStore.FlowActionType.LAUNCH));
+  }
+
+  public void addDagNodeToRetry(Dag.DagNode<JobExecutionPlan> dagNode) {
+    // todo - how to add dag action for for a dag node? should we create a dag 
node action? right now dag action is synonymous to flow action
+    // this.dagTaskStream.addDagTask(new RetryDagTask(dagNode));
+  }
+
+  public void addKillDagAction(Dag<JobExecutionPlan> dag) {
+    this.dagTaskStream.addDagAction(DagManagerUtils.createDagAction(dag, 
DagActionStore.FlowActionType.KILL));
+  }
+
+  public void addKillDagAction(Dag.DagNode<JobExecutionPlan> dagNode) {
+    this.dagTaskStream.addDagAction(DagManagerUtils.createDagAction(dagNode, 
DagActionStore.FlowActionType.KILL));
+  }
+
+  public void addAdvanceDagAction(Dag.DagNode<JobExecutionPlan> dagNode) {
+    this.dagTaskStream.addDagAction(DagManagerUtils.createDagAction(dagNode, 
DagActionStore.FlowActionType.ADVANCE));
+  }
+
+  public void addDagAction(DagActionStore.DagAction dagAction) {
+    this.dagTaskStream.addDagAction(dagAction);
+  }
+
+  @AllArgsConstructor
+  private static class DagProcEngineThread implements Runnable {

Review Comment:
   nit: an inner class need not repeat the name of its enclosing parent.  if 
you wanted to avoid `Thread`, maybe `ProcThread` or `EngineThread`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Holds a stream of {@link DagTask}s that {@link DagProcessingEngine} would 
pull from for processing.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterator} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+// change to iterable
+public class DagTaskStream implements Iterator<DagTask>{
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue;
+  //private FlowTriggerHandler flowTriggerHandler;
+  private final DagManagementStateStore dagManagementStateStore;

Review Comment:
   let's have the `DagProcessingEngine` pass this along to the `DagProc`s it 
executes



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Holds a stream of {@link DagTask}s that {@link DagProcessingEngine} would 
pull from for processing.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterator} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+// change to iterable
+public class DagTaskStream implements Iterator<DagTask>{
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue;
+  //private FlowTriggerHandler flowTriggerHandler;
+  private final DagManagementStateStore dagManagementStateStore;
+
+  @Override
+  public boolean hasNext() {
+    return !this.dagActionQueue.isEmpty();
+  }
+
+  @Override
+  public DagTask next() {
+    DagActionStore.DagAction dagAction = this.dagActionQueue.poll();
+    try {
+      // todo reconsider the use of MultiActiveLeaseArbiter
+      //MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction);
+      // todo - uncomment after flow trigger handler provides such an api
+      //Properties jobProps = getJobProperties(dagAction);
+      //flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+      //if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+        // can it return null? is this iterator allowed to return null?
+        return createDagTask(dagAction, new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, 
System.currentTimeMillis()));
+      //}
+    } catch (Exception e) {
+      //TODO: need to handle exceptions gracefully
+      log.error("Error creating DagTask", e);
+    }
+    return null;
+  }
+
+  // todo - move it to dag action class, and move the entire dag action class 
to gobblin-service module

Review Comment:
   +1, I don't believe this belongs here



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+  public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "pollingInterval";
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+
+  private final DagTaskStream dagTaskStream;
+  Optional<EventSubmitter> eventSubmitter;
+
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory, MultiActiveLeaseArbiter multiActiveLeaseArbiter) 
{
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+    Integer numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    ScheduledExecutorService scheduledExecutorPool = 
Executors.newScheduledThreadPool(numThreads);
+    Integer pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.dagTaskStream = dagTaskStream;
+
+    for (int i=0; i < numThreads; i++) {
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagTaskStream, dagProcFactory);
+      scheduledExecutorPool.scheduleAtFixedRate(dagProcEngineThread, 0, 
pollingInterval, TimeUnit.SECONDS);
+    }
+  }
+
+  public void addNewDag(Dag<JobExecutionPlan> dag) {
+    this.dagTaskStream.addDagAction(DagManagerUtils.createDagAction(dag, 
DagActionStore.FlowActionType.LAUNCH));
+  }
+
+  public void addDagNodeToRetry(Dag.DagNode<JobExecutionPlan> dagNode) {
+    // todo - how to add dag action for for a dag node? should we create a dag 
node action? right now dag action is synonymous to flow action
+    // this.dagTaskStream.addDagTask(new RetryDagTask(dagNode));
+  }
+
+  public void addKillDagAction(Dag<JobExecutionPlan> dag) {
+    this.dagTaskStream.addDagAction(DagManagerUtils.createDagAction(dag, 
DagActionStore.FlowActionType.KILL));
+  }
+
+  public void addKillDagAction(Dag.DagNode<JobExecutionPlan> dagNode) {
+    this.dagTaskStream.addDagAction(DagManagerUtils.createDagAction(dagNode, 
DagActionStore.FlowActionType.KILL));
+  }
+
+  public void addAdvanceDagAction(Dag.DagNode<JobExecutionPlan> dagNode) {
+    this.dagTaskStream.addDagAction(DagManagerUtils.createDagAction(dagNode, 
DagActionStore.FlowActionType.ADVANCE));
+  }
+
+  public void addDagAction(DagActionStore.DagAction dagAction) {
+    this.dagTaskStream.addDagAction(dagAction);
+  }
+
+  @AllArgsConstructor
+  private static class DagProcEngineThread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+
+    @Override
+    public void run() {
+      for (DagTaskStream it = dagTaskStream; it.hasNext(); ) {
+        DagTask dagTask = it.next();
+        DagProc dagProc = dagTask.host(dagProcFactory);
+//          dagProc.process(eventSubmitter.get(), maxRetryAttempts, 
delayRetryMillis);
+        try {
+          // todo - add retries
+          dagProc.process(dagTask);

Review Comment:
   since the `DagProc` was created from the `DagTask`, there should be no 
reason to pass the dag task once again to the dag proc created from it



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManager.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.*;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) manages {@link Dag}s through {@link DagManagementStateStore}.
+ * b) subscribes to {@link JobStatusEvent} sent by {@link 
KafkaJobStatusMonitor}
+ * c) spawns a {@link KillDagThread} that enforces run time and job start time 
deadlines.
+ * d) spawns a {@link DagManager.FailedDagRetentionThread} that cleans failed 
dags.
+ * e) load {@link Dag}s on service-start / set-active.
+ */
+@Slf4j
+public class NewDagManager implements DagManagement {
+  public static final String DAG_MANAGER_PREFIX = 
"gobblin.service.dagManager.";
+  public static final Integer DEFAULT_NUM_THREADS = 3;
+  public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + 
"numThreads";
+  private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + 
"dagStateStoreClass";
+  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  private static final String FAILED_DAG_RETENTION_TIME_UNIT = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.timeUnit";
+  private static final String DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT = "DAYS";
+  private static final String FAILED_DAG_RETENTION_TIME = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.time";
+  private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
+  // Re-emit the final flow status if not detected within 5 minutes
+  public static final String FAILED_DAG_POLLING_INTERVAL = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
+  public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
+  // Default job start SLA time if configured, measured in minutes. Default is 
10 minutes
+  private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
+  private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
+  private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
+  private final Config config;
+  private final Integer retentionPollingInterval;
+
+  private final ScheduledExecutorService scheduledExecutorPool;
+  @Getter private final DagStateStore failedDagStateStore;
+  @Getter private Set<String> failedDagIds;
+  private Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+  @Getter private DagStateStore dagStateStore;
+  protected final Long defaultJobStartSlaTimeMillis;
+  @Getter private final JobStatusRetriever jobStatusRetriever;
+  @Getter private final UserQuotaManager quotaManager;
+  @Getter private final Optional<Timer> jobStatusPolledTimer;
+  @Getter private final Optional<EventSubmitter> eventSubmitter;
+  private final long failedDagRetentionTime;
+  @Getter private final DagManagerMetrics dagManagerMetrics;
+  @Getter private final DagProcessingEngine dagProcessingEngine;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject(optional=true)
+  DagManagementStateStore dagManagementStateStore;
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  protected final EventBus eventBus;
+
+  public NewDagManager(Config config, JobStatusRetriever jobStatusRetriever, 
Optional<DagActionStore> dagActionStore, boolean instrumentationEnabled,
+      DagProcessingEngine dagProcessingEngine, DagManagementStateStore 
dagManagementStateStore)
+      throws IOException {
+    this.config = config;
+    Integer numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    this.dagActionStore = dagActionStore;
+    this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+    this.retentionPollingInterval = ConfigUtils.getInt(config, 
FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
+    this.eventBus = KafkaJobStatusMonitor.getEventBus();
+    this.eventBus.register(this);
+    this.dagProcessingEngine = dagProcessingEngine;
+    this.dagManagementStateStore = dagManagementStateStore;
+    MetricContext metricContext;
+    if (instrumentationEnabled) {
+      metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+      this.jobStatusPolledTimer = 
Optional.of(metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
+      this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+    } else {
+      this.jobStatusPolledTimer = Optional.absent();
+      this.eventSubmitter = Optional.absent();
+    }
+    this.dagManagerMetrics = new DagManagerMetrics();
+    TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+    this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+    this.jobStatusRetriever = jobStatusRetriever;
+    this.quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+        ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
+        config);
+    TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
+    this.failedDagRetentionTime = 
timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, 
DEFAULT_FAILED_DAG_RETENTION_TIME));
+    KillDagThread killDagThread = new 
KillDagThread(defaultJobStartSlaTimeMillis, this);
+    this.scheduledExecutorPool.scheduleAtFixedRate(killDagThread, 100L, 60L, 
TimeUnit.SECONDS);
+    this.failedDagStateStore = 
createDagStateStore(ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX)
+        .withFallback(config), topologySpecMap);
+    setActive();
+  }
+
+  public synchronized void setActive() throws IOException {
+    this.dagStateStore = createDagStateStore(config, topologySpecMap);
+    this.failedDagIds = 
Collections.synchronizedSet(failedDagStateStore.getDagIds());
+    this.dagManagerMetrics.activate();
+    UserQuotaManager quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+        ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
+    quotaManager.init(dagStateStore.getDags());
+    DagManager.FailedDagRetentionThread
+        failedDagRetentionThread = new 
DagManager.FailedDagRetentionThread(failedDagStateStore, failedDagIds, 
failedDagRetentionTime);
+    this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 
0, retentionPollingInterval, TimeUnit.MINUTES);
+    loadDagFromDagStateStore();
+    ScheduledExecutorService houseKeepingThreadPool = 
Executors.newSingleThreadScheduledExecutor();
+    for (int delay = INITIAL_HOUSEKEEPING_THREAD_DELAY; delay < 
MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) {
+      houseKeepingThreadPool.schedule(() -> {
+        try {
+          loadDagFromDagStateStore();
+        } catch (Exception e ) {
+          log.error("failed to sync dag state store due to ", e);
+        }}, delay, TimeUnit.MINUTES);
+    }
+  }
+
+  public DagStateStore createDagStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) {
+    try {
+      Class<?> dagStateStoreClass = 
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, 
FSDagStateStore.class.getName()));
+      return (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, 
topologySpecMap);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void addDag(String dagId, Dag<JobExecutionPlan> dag) {
+    this.dagManagementStateStore.addDag(dagId, dag);
+    this.dagManagementStateStore.addJobState(dagId, null);
+  }
+
+  @Override
+  public Dag<JobExecutionPlan> getDag(String dagId) {
+    return this.dagManagementStateStore.getDag(dagId);
+  }
+
+  @Override
+  public boolean containsDag(String dagId) {
+    return this.dagManagementStateStore.containsDag(dagId);
+  }
+
+  @Override
+  public List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException {
+    return this.dagManagementStateStore.getAllJobs();
+  }

Review Comment:
   who is supposed to invoke these, and why aren't they directly using 
`DagManagementStateStore`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
+import org.apache.gobblin.service.modules.orchestration.DagStateStore;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<T extends DagTask> {
+  protected final DagProcFactory dagProcFactory;

Review Comment:
   why hold on to the factory that created this `DagProc`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
+import org.apache.gobblin.service.modules.orchestration.DagStateStore;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<T extends DagTask> {

Review Comment:
   I don't see where you're using the type param, `T`.  the specific `DagTask` 
derived type can probably be passed into the ctor for the `DagProc` derived 
type.
   
   generics would come in handy though for:
   a. the return type of `initialize()` (which is the param type of `act()`)
   b. the return type of `act()` (which is the param type of `commit()`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
+import org.apache.gobblin.service.modules.orchestration.DagStateStore;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<T extends DagTask> {
+  protected final DagProcFactory dagProcFactory;
+  protected final UserQuotaManager quotaManager;
+  protected final Optional<EventSubmitter> eventSubmitter;
+  protected DagStateStore dagStateStore;
+  protected final AtomicLong orchestrationDelay;
+  protected final NewDagManager dagManager;
+  protected final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+  private final MetricContext metricContext;
+
+  public DagProc(DagProcFactory dagProcFactory) {
+    // todo make it cleaner
+    this.dagProcFactory = dagProcFactory;
+    this.dagManager = dagProcFactory.dagManager;
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.quotaManager = this.dagManager.getQuotaManager();
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+    this.dagStateStore = this.dagManager.getDagStateStore();
+    this.orchestrationDelay = new AtomicLong(0);
+    ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
+        orchestrationDelay::get);
+    metricContext.register(orchestrationDelayMetric);
+  }
+
+  public void process(DagTask dagTask) throws IOException {
+    initialize();
+    act();
+    commit();
+  }
+
+  protected abstract void initialize() throws IOException;
+
+  protected abstract void act() throws IOException;
+
+  private void commit() {
+    // todo - commit the modified dags to the persistent store
+  }
+
+  protected void initializeDag(Dag<JobExecutionPlan> dag)
+      throws IOException {
+    //Add Dag to the map of running dags
+    String dagId = DagManagerUtils.generateDagId(dag).toString();
+    log.info("Initializing Dag {}", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+    if (this.dagManager.containsDag(dagId)) {
+      log.warn("Already tracking a dag with dagId {}, skipping.", dagId);
+      return;
+    }
+
+    this.dagManager.addDag(dagId, dag);
+    log.debug("Dag {} - determining if any jobs are already running.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+
+    //A flag to indicate if the flow is already running.
+    boolean isDagRunning = false;
+    //Are there any jobs already in the running state? This check is for Dags 
already running
+    //before a leadership change occurs.
+    for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+      if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
+        this.dagManager.addJobState(dagId, dagNode);
+        //Update the running jobs counter.
+        dagManagerMetrics.incrementRunningJobMetrics(dagNode);
+        isDagRunning = true;
+      }
+    }
+
+    FlowId flowId = DagManagerUtils.getFlowId(dag);
+    this.dagManagerMetrics.registerFlowMetric(flowId, dag);
+
+    log.debug("Dag {} submitting jobs ready for execution.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+    //Determine the next set of jobs to run and submit them for execution
+    Map<String, Set<Dag.DagNode<JobExecutionPlan>>> nextSubmitted = 
submitNext(dagId);
+    for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted.get(dagId)) {
+      this.dagManager.addJobState(dagId, dagNode);
+    }
+
+    // Set flow status to running
+    DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, 
TimingEvent.FlowTimings.FLOW_RUNNING);
+    dagManagerMetrics.conditionallyMarkFlowAsState(flowId, 
DagManager.FlowState.RUNNING);
+
+    // Report the orchestration delay the first time the Dag is initialized. 
Orchestration delay is defined as
+    // the time difference between the instant when a flow first transitions 
to the running state and the instant
+    // when the flow is submitted to Gobblin service.
+    if (!isDagRunning) {
+      this.orchestrationDelay.set(System.currentTimeMillis() - 
DagManagerUtils.getFlowExecId(dag));
+    }
+
+    log.info("Dag {} Initialization complete.", 
DagManagerUtils.getFullyQualifiedDagName(dag));
+  }
+
+  /**
+   * Submit next set of Dag nodes in the Dag identified by the provided dagId
+   * @param dagId The dagId that should be processed.
+   * @return
+   * @throws IOException
+   */
+  synchronized Map<String, Set<Dag.DagNode<JobExecutionPlan>>> 
submitNext(String dagId)

Review Comment:
   isn't this the responsibility of `AdvanceDagProc`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends DagProc<KillDagTask> {

Review Comment:
   I suggest getting the basics of `DagProc` impls ironed out with just one 
(any one is fine).  in the meanwhile leave all of the other impls blank



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+
+import com.google.common.base.Optional;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Holds a stream of {@link DagTask}s that {@link DagProcessingEngine} would 
pull from for processing.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+// change to iterable
+public class DagTaskStream implements Iterator<DagTask>{

Review Comment:
   yes, should definitely implement `DagManagement`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to