[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=906723&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906723
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Feb/24 23:41
            Start Date: 23/Feb/24 23:41
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3858:
URL: https://github.com/apache/gobblin/pull/3858#discussion_r1501208594


##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -124,6 +126,9 @@ public class ConfigurationKeys {
   public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
   public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 10000;
 
+  public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
+  public static final String DAG_PROCESSING_ENGINE_ENABLED = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";

Review Comment:
   apologies if I was confused and earlier suggested to move this from 
`ServiceConfigKeys` (I forget details...)
   
   anyway given that `ConfigurationKeys` would now need to reference SCKs, I 
see it making the most sense to situate these new ones along w/ their prefix 
and also in SCKs



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, T> {
+  protected final EventSubmitter eventSubmitter;
+
+  protected final MetricContext metricContext;
+
+  public DagProc() {
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }

Review Comment:
   raises an interesting Q... do we want a different `MetricContext` for each 
specific `DagProc` derived type?  relatedly, although orthogonal (as we could 
use caching) do we want to pay the cost of initialization on every Dag action 
we process?
   
   if we decide against a specific MC for every DagProc type, let's just pass 
an `EventSubmitter` in when constructing each `DagProc` derived class instance.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, T> {
+  protected final EventSubmitter eventSubmitter;
+
+  protected final MetricContext metricContext;
+
+  public DagProc() {
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }
+
+  public void process(DagManagementStateStore dagManagementStateStore) throws 
IOException {
+    S state = initialize(dagManagementStateStore);   // todo - retry
+    T result = act(dagManagementStateStore, state);   // todo - retry
+    commit(dagManagementStateStore, result);   // todo - retry
+  }
+
+  protected abstract S initialize(DagManagementStateStore 
dagManagementStateStore) throws IOException;
+
+  protected abstract T act(DagManagementStateStore dagManagementStateStore, S 
state) throws IOException;
+
+  protected void commit(DagManagementStateStore dagManagementStateStore, T 
result) {

Review Comment:
   why not make this `abstract` like the others?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagManagement} and 
processing the
+ * {@link org.apache.gobblin.service.modules.flowgraph.Dag} based on the type 
of {@link DagTask}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+  public static final String NUM_THREADS_KEY = 
ConfigurationKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+
+  @Getter private final DagManagement dagManager;
+  @Getter DagManagementStateStore dagManagementStateStore;
+
+  @Inject
+  public DagProcessingEngine(Config config, DagManagement dagManager, 
DagProcFactory dagProcFactory,
+      DagManagementStateStore dagManagementStateStore, 
Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter) {
+    Integer numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    ScheduledExecutorService scheduledExecutorPool =
+        Executors.newScheduledThreadPool(numThreads, new 
NamedThreadFactory("DagProcessingEngineThread"));
+    this.dagManager = dagManager;
+    this.dagManagementStateStore = dagManagementStateStore;
+
+    for (int i=0; i < numThreads; i++) {
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagManager, dagProcFactory, dagManagementStateStore);
+      scheduledExecutorPool.submit(dagProcEngineThread);
+    }
+  }
+
+  @AllArgsConstructor
+  private static class DagProcEngineThread implements Runnable {
+
+    private DagManagement dagManager;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+
+    @Override
+    public void run() {
+      while (true) {
+        DagTask<DagProc> dagTask = dagManager.next(); // blocking call
+        if (dagTask == null) {
+          continue;
+        }
+        DagProc dagProc = dagTask.host(dagProcFactory);
+        try {
+          // todo - add retries
+          dagProc.process(dagManagementStateStore);
+        } catch (Throwable t) {
+          log.error("DagProcEngineThread encountered error ", t);
+        }
+        // todo mark lease success and releases it
+        //dagTaskStream.complete(dagTask);

Review Comment:
   FYI `conclude()` should only be called when the task is successful, so 
inside the `try` block



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
+  private final LaunchDagTask launchDagTask;
+  private final AtomicLong orchestrationDelay;

Review Comment:
   suggest better naming to indicate this is an observational measurement 
recorded for metrics



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagManagement} and 
processing the
+ * {@link org.apache.gobblin.service.modules.flowgraph.Dag} based on the type 
of {@link DagTask}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+  public static final String NUM_THREADS_KEY = 
ConfigurationKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+
+  @Getter private final DagManagement dagManager;
+  @Getter DagManagementStateStore dagManagementStateStore;
+
+  @Inject
+  public DagProcessingEngine(Config config, DagManagement dagManager, 
DagProcFactory dagProcFactory,
+      DagManagementStateStore dagManagementStateStore, 
Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter) {

Review Comment:
   as mentioned elsewhere, once the `LeaseAttemptSuccess` becomes a Proxy 
object, there will be no need to take in the MALA param here



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -54,18 +56,30 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   // dagToJobs holds a map of dagId to running jobs of that dag
   private final Map<DagManager.DagId, 
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new 
ConcurrentHashMap<>();
   private final Map<DagManager.DagId, Long> dagToDeadline = new 
ConcurrentHashMap<>();
-  private final DagStateStore dagStateStore;
-  private final DagStateStore failedDagStateStore;
+  private DagStateStore dagStateStore;
+  private DagStateStore failedDagStateStore;
+  private boolean dagStoresInitialized = false;
   private final UserQuotaManager quotaManager;
+  @Setter Map<URI, TopologySpec> topologySpecMap;
+  private final Config config;
   private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
   public static final String DAG_STATESTORE_CLASS_KEY = 
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
 
-  public MostlyMySqlDagManagementStateStore(Config config, Map<URI, 
TopologySpec> topologySpecMap) throws IOException {
-    this.dagStateStore = createDagStateStore(config, topologySpecMap);
-    this.failedDagStateStore = createDagStateStore(
-        ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+  @Inject
+  public MostlyMySqlDagManagementStateStore(Config config) throws IOException {
     this.quotaManager = new MysqlUserQuotaManager(config);
-    this.quotaManager.init(getDags());
+    this.config = config;
+   }
+
+  @Override
+  public void start() throws IOException {
+    if (!dagStoresInitialized) {

Review Comment:
   not thread-safe.  is that OK or do we want synchronization?
   
   anyway, could we just calc whether "isInitialized" via `dagStateStore != 
null`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.inject.Singleton;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+/**
+ * {@link DagTaskVisitor} for transforming a specific {@link DagTask} derived 
class to its companion {@link DagProc} derived class.
+ */
+
+@Alpha
+@Slf4j

Review Comment:
   suggest to wait to add until actually using



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.

Review Comment:
   feels misleading to say it "processes the DT"  I suggest to merely clip the 
first four words of the second sentence and write "for a given DT by first..."
   
   also, small nit: "for performing", not "to performing"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, T> {
+  protected final EventSubmitter eventSubmitter;
+
+  protected final MetricContext metricContext;
+
+  public DagProc() {
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }
+
+  public void process(DagManagementStateStore dagManagementStateStore) throws 
IOException {

Review Comment:
   potentially should be `final`... what do you think?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing operations on Dag.
+ */
+public interface DagManagement {
+
+  void addDag(FlowSpec flowSpec, Dag<JobExecutionPlan> dag, boolean persist, 
boolean setStatus) throws IOException;
+  boolean hasNext();
+
+  DagTask<DagProc> next();

Review Comment:
   these two belong in a separate interface, which I suggest naming 
`DagTaskStream`.  that would `extends Iterator<DagProc>`.
   
   the reason for the separate interface is that I don't envision any client 
who should both `addTask` and `next`.  AFAICT a client either does ONE OR THE 
OTHER, not both.
   
   there after one class to implement both (e.g.
   ```
   DagManagementTaskStreamImpl implements DagManagement, DagTaskStream
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link DagTask}.

Review Comment:
   "for {@link LaunchDagTask}"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task in a Dag.
+ * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it 
will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} 
as complete

Review Comment:
   describe "the contract" - if you really want to go into internal impl 
details that's ok, but at least describe it at a high-level first.  I'd 
probably give just the high-level and then stop there.
   
   e.g. "upon successful completion of the corresponding DagProc's process, 
`conclude()` MUST be called upon this `DagTask`"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagManagement} and 
processing the
+ * {@link org.apache.gobblin.service.modules.flowgraph.Dag} based on the type 
of {@link DagTask}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.

Review Comment:
   "Each `DagTask` returned from the DTS comes with a time-limited lease 
conferring the exclusive right to perform the work of the task."
   
   (I specifically suggest leaving out both `DagAction`s and even the 
`MultiActiveLeaseArbiter`, which are very deep impl details from the 
perspective of the `DagProcEngine`.)
   



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+public interface DagTaskVisitor<T> {
+  T meet(LaunchDagTask launchDagTask);
+}

Review Comment:
   great interface--no concerns there...
   
   1. please add javadoc
   2. also, a TODO noting that each specific `DagTask` needs its own `meet` 
method overload, so several more forthcoming



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagManagement} and 
processing the

Review Comment:
   after renaming the interface, it will be "from {@link DagTaskStream}"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task in a Dag.
+ * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it 
will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} 
as complete
+ */
+
+@Alpha
+public abstract class DagTask<T> {
+  @Getter public DagActionStore.DagAction dagAction;
+  private MultiActiveLeaseArbiter.LeaseAttemptStatus leaseObtainedStatus;
+  @Getter DagManager.DagId dagId;
+
+  public DagTask(DagActionStore.DagAction dagAction, 
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseObtainedStatus) {
+    this.dagAction = dagAction;
+    this.leaseObtainedStatus = leaseObtainedStatus;
+    this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+  }
+
+  public abstract T host(DagTaskVisitor<T> visitor);
+
+  /**
+   * Currently, conclusion of {@link DagTask} marks and records a successful 
release of lease.
+   * It is invoked after {@link DagProc#process(DagManagementStateStore)} is 
completed successfully.
+   * @param multiActiveLeaseArbiter
+   * @throws IOException
+   */
+  // todo call it from the right place
+  public void conclude(MultiActiveLeaseArbiter multiActiveLeaseArbiter) throws 
IOException {

Review Comment:
   @umustafi and I discussed having the `LeaseAttemptSuccess` remember its 
MALeaseArbiter, to enable its use as a proxy for recording lease success.
   
   with that, there'd be no need for the MALA param.  also, IIRC 
`recordLeaseSuccess` returns a boolean, so that should be the return type here 
and it's important to note the semantics in method javadoc



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task in a Dag.

Review Comment:
   nit: "in" => "on"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagManagement} and 
processing the
+ * {@link org.apache.gobblin.service.modules.flowgraph.Dag} based on the type 
of {@link DagTask}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+  public static final String NUM_THREADS_KEY = 
ConfigurationKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+
+  @Getter private final DagManagement dagManager;
+  @Getter DagManagementStateStore dagManagementStateStore;
+
+  @Inject
+  public DagProcessingEngine(Config config, DagManagement dagManager, 
DagProcFactory dagProcFactory,
+      DagManagementStateStore dagManagementStateStore, 
Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter) {
+    Integer numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    ScheduledExecutorService scheduledExecutorPool =
+        Executors.newScheduledThreadPool(numThreads, new 
NamedThreadFactory("DagProcessingEngineThread"));
+    this.dagManager = dagManager;
+    this.dagManagementStateStore = dagManagementStateStore;
+
+    for (int i=0; i < numThreads; i++) {
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagManager, dagProcFactory, dagManagementStateStore);
+      scheduledExecutorPool.submit(dagProcEngineThread);
+    }
+  }
+
+  @AllArgsConstructor
+  private static class DagProcEngineThread implements Runnable {
+
+    private DagManagement dagManager;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+
+    @Override
+    public void run() {
+      while (true) {
+        DagTask<DagProc> dagTask = dagManager.next(); // blocking call
+        if (dagTask == null) {
+          continue;
+        }
+        DagProc dagProc = dagTask.host(dagProcFactory);
+        try {
+          // todo - add retries
+          dagProc.process(dagManagementStateStore);

Review Comment:
   add: `dagTask.conclude()`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing operations on Dag.
+ */
+public interface DagManagement {
+
+  void addDag(FlowSpec flowSpec, Dag<JobExecutionPlan> dag, boolean persist, 
boolean setStatus) throws IOException;

Review Comment:
   at first I read too quickly and thought this was
   ```
   addDag(DagAction dagAction)
   ```
   BTW, where is that defined?
   
   I don't understand who uses this particular method signature.  please 
describe along w/ params in javadoc



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+
+
+/**
+ * A {@link DagTask} responsible to handle launch tasks.
+ */
+
+@Slf4j
+@Alpha

Review Comment:
   javadoc
   
   ...also, what do you consider `@Alpha` about this?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagManagement} and 
processing the
+ * {@link org.apache.gobblin.service.modules.flowgraph.Dag} based on the type 
of {@link DagTask}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore)}

Review Comment:
   suggest combining the two sentences.  e.g.:
   
   "The DPF transforms each `DagTask` into a specific, concrete `DagProc`, 
which encapsulates all processing inside `DagProc#process`"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagManagement} and 
processing the
+ * {@link org.apache.gobblin.service.modules.flowgraph.Dag} based on the type 
of {@link DagTask}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+  public static final String NUM_THREADS_KEY = 
ConfigurationKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+
+  @Getter private final DagManagement dagManager;

Review Comment:
   reminder: rename to `DagTaskStream`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManager.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) manages {@link Dag}s through {@link DagManagementStateStore}.
+ * b) load {@link Dag}s on service-start / set-active.
+ * c) accept adhoc new dag launch requests from Orchestrator.
+ */
+@Slf4j
+@Singleton
+@Data
+public class NewDagManager implements DagManagement {

Review Comment:
   I'm not clear: who's actually using this?  I don't see references to it 
within this PR
   
   or is this in some manner assuming the role of what I was elsewhere calling 
`DagManagementTaskStreamImpl`?  if so, I suggest to break that out separated, 
so we can see what truly remains here ITO `topologySpecMap`, 
`DagManagerMetrics` and anything else.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import javax.naming.OperationNotSupportedException;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+  public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
+  public static final String DAG_PROCESSING_ENGINE_ENABLED = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+
+  @Getter private final DagTaskStream dagTaskStream;
+  @Getter DagManagementStateStore dagManagementStateStore;
+
+  @Inject
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory,
+      DagManagementStateStore dagManagementStateStore, 
Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter) {
+    Integer numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    ScheduledExecutorService scheduledExecutorPool =
+        Executors.newScheduledThreadPool(numThreads, new 
NamedThreadFactory("DagProcessingEngineThread"));
+    this.dagTaskStream = dagTaskStream;
+    this.dagManagementStateStore = dagManagementStateStore;
+
+    for (int i=0; i < numThreads; i++) {
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagTaskStream, dagProcFactory, dagManagementStateStore);
+      scheduledExecutorPool.submit(dagProcEngineThread);
+    }
+  }
+
+  public void addDagNodeToRetry(Dag.DagNode<JobExecutionPlan> dagNode) throws 
OperationNotSupportedException {
+    // todo - how to add dag action for a dag node? should we create a dag 
node action? right now dag action is synonymous to flow action
+    // this.dagTaskStream.addDagTask(new RetryDagTask(dagNode));
+    throw new OperationNotSupportedException();
+  }
+
+  @AllArgsConstructor
+  private static class DagProcEngineThread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+
+    @Override
+    public void run() {
+      while (true) {
+        DagTask dagTask = dagTaskStream.next(); // blocking call
+        if (dagTask == null) {
+          continue;

Review Comment:
   we don't believe it could happen, but perhaps it's OK to leave in 
defensively.  if so, please add `log.warn` that it was hit



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagManagement} and 
processing the
+ * {@link org.apache.gobblin.service.modules.flowgraph.Dag} based on the type 
of {@link DagTask}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+  public static final String NUM_THREADS_KEY = 
ConfigurationKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  private static final Integer DEFAULT_NUM_THREADS = 3;

Review Comment:
   let's have this config live in the same place as this other one:
   ```
   GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManager.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) manages {@link Dag}s through {@link DagManagementStateStore}.
+ * b) load {@link Dag}s on service-start / set-active.
+ * c) accept adhoc new dag launch requests from Orchestrator.
+ */
+@Slf4j
+@Singleton
+@Data
+public class NewDagManager implements DagManagement {
+  public static final String DAG_MANAGER_PREFIX = 
"gobblin.service.dagManager.";
+  private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
+
+  private final Config config;
+  @Inject private FlowCatalog flowCatalog;
+  private final boolean dagProcessingEngineEnabled;
+  private Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+  @Getter private final EventSubmitter eventSubmitter;
+  @Getter private static final DagManagerMetrics dagManagerMetrics = new 
DagManagerMetrics();
+  private ScheduledExecutorService houseKeepingThreadPool;
+  private volatile boolean isActive = false;
+
+  @Inject(optional=true)
+  protected Optional<DagActionStore> dagActionStore;
+  @Inject
+  @Getter DagManagementStateStore dagManagementStateStore;
+  private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new 
LinkedBlockingQueue<>();
+
+  @Inject
+  public NewDagManager(Config config, Optional<DagActionStore> dagActionStore, 
DagManagementStateStore dagManagementStateStore) {
+    this.config = config;
+    this.dagActionStore = dagActionStore;
+    this.dagProcessingEngineEnabled = ConfigUtils.getBoolean(config, 
ConfigurationKeys.DAG_PROCESSING_ENGINE_ENABLED, false);
+    this.dagManagementStateStore = dagManagementStateStore;
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+  }
+
+  public void setActive(boolean active) {
+    if (this.isActive == active) {
+      log.info("DagManager already {}, skipping further actions.", (!active) ? 
"inactive" : "active");
+    }
+    this.isActive = active;
+    try {
+      if (this.isActive) {
+        log.info("Activating NewDagManager.");
+        //Initializing state store for persisting Dags.
+        this.dagManagementStateStore.start();
+        dagManagerMetrics.activate();
+        loadDagFromDagStateStore();
+        this.houseKeepingThreadPool = 
Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("LoadDagsThread"));
+        for (int delay = INITIAL_HOUSEKEEPING_THREAD_DELAY; delay < 
MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) {
+          this.houseKeepingThreadPool.schedule(() -> {
+            try {
+              loadDagFromDagStateStore();
+            } catch (Exception e) {
+              log.error("failed to sync dag state store due to ", e);
+            }
+          }, delay, TimeUnit.MINUTES);
+        }
+      } else { //Mark the DagManager inactive.
+        log.info("Inactivating the DagManager. Shutting down all DagManager 
threads");
+        dagManagerMetrics.cleanup();
+        this.houseKeepingThreadPool.shutdown();
+      }
+    } catch (IOException e) {
+        log.error("Exception encountered when activating the new DagManager", 
e);
+        throw new RuntimeException(e);
+    }
+  }
+
+  private void loadDagFromDagStateStore() throws IOException {

Review Comment:
   how do you envision this and the housekeeping thread integrating w/ the 
multi-active architecture, wherein each specific `DagProc::initialize` impl is 
already fetching the most up-to-date Dag state each time before undertaking 
processing?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import javax.naming.OperationNotSupportedException;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+  public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX = 
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
+  public static final String DAG_PROCESSING_ENGINE_ENABLED = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+
+  @Getter private final DagTaskStream dagTaskStream;
+  @Getter DagManagementStateStore dagManagementStateStore;
+
+  @Inject
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory,
+      DagManagementStateStore dagManagementStateStore, 
Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter) {
+    Integer numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    ScheduledExecutorService scheduledExecutorPool =
+        Executors.newScheduledThreadPool(numThreads, new 
NamedThreadFactory("DagProcessingEngineThread"));
+    this.dagTaskStream = dagTaskStream;
+    this.dagManagementStateStore = dagManagementStateStore;
+
+    for (int i=0; i < numThreads; i++) {
+      DagProcEngineThread dagProcEngineThread = new 
DagProcEngineThread(dagTaskStream, dagProcFactory, dagManagementStateStore);
+      scheduledExecutorPool.submit(dagProcEngineThread);
+    }
+  }
+
+  public void addDagNodeToRetry(Dag.DagNode<JobExecutionPlan> dagNode) throws 
OperationNotSupportedException {
+    // todo - how to add dag action for a dag node? should we create a dag 
node action? right now dag action is synonymous to flow action
+    // this.dagTaskStream.addDagTask(new RetryDagTask(dagNode));
+    throw new OperationNotSupportedException();
+  }
+
+  @AllArgsConstructor
+  private static class DagProcEngineThread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+
+    @Override
+    public void run() {
+      while (true) {
+        DagTask dagTask = dagTaskStream.next(); // blocking call
+        if (dagTask == null) {
+          continue;
+        }
+        DagProc dagProc = dagTask.host(dagProcFactory);
+        try {
+          // todo - add retries
+          dagProc.process(dagManagementStateStore);
+        } catch (Throwable t) {
+          log.error("DagProcEngineThread encountered error ", t);
+        }
+        // todo mark lease success and releases it
+        //dagTaskStream.complete(dagTask);
+      }

Review Comment:
   one thought is (behind-the-scenes) always to set a reminder, even when the 
lease is obtained.  that way, even a failed attempt does not result in a leaked 
task/action on whichever participant/host made the attempt. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 906723)
    Time Spent: 22h 40m  (was: 22.5h)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 22h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to