phet commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1342955936


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states

Review Comment:
   nit: strike "An interface to provide"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios 
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+  /**
+   * Currently, it is handling just the launch of a {@link Dag} request via 
REST client for adhoc flows.
+   * The eventTimestamp for adhoc flows will always be 0 (zero), while -1 
would indicate failures.
+   * Essentially for a valid launch flow, the eventTimestamp needs to be >= 0.
+   * Future implementations will cover launch of flows through the scheduler 
too!

Review Comment:
   rather than documenting this to be merely informative, could we instead 
rephrase as an interface req/policy? 
   
   (overall, I'm uncertain on your overall plan for the impl's validation, so 
this is based solely on reading the javadoc in isolation...)
   
   e.g. throws IllegalArgException when `eventTimestamp < 0`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.CleanUpDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.ResumeDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given 
{@link DagTask}.
+ */
+
+@Alpha
+@Slf4j
+public class DagProcFactory implements DagTaskVisitor<DagProc> {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private JobStatusRetriever jobStatusRetriever;
+  private FlowStatusGenerator flowStatusGenerator;
+  private UserQuotaManager quotaManager;
+  private SpecCompiler specCompiler;
+  private FlowCatalog flowCatalog;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private Config config;
+  private Optional<EventSubmitter> eventSubmitter;
+  private boolean instrumentationEnabled;
+
+  public DagProcFactory(Config config, DagManagementStateStore 
dagManagementStateStore, JobStatusRetriever jobStatusRetriever,
+      FlowStatusGenerator flowStatusGenerator, UserQuotaManager quotaManager, 
SpecCompiler specCompiler, FlowCatalog flowCatalog, 
FlowCompilationValidationHelper flowCompilationValidationHelper, boolean 
instrumentationEnabled) {
+
+    this.config = config;
+    this.dagManagementStateStore = dagManagementStateStore;
+    this.jobStatusRetriever = jobStatusRetriever;
+    this.flowStatusGenerator = flowStatusGenerator;
+    this.quotaManager = quotaManager;
+    this.specCompiler = specCompiler;
+    this.flowCatalog = flowCatalog;
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
+    this.instrumentationEnabled = instrumentationEnabled;

Review Comment:
   these look like they could be `private final`... is that true?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios 
for launch, resume, kill, job start
+ * and flow completion deadlines

Review Comment:
   rather than "responsible for *defining the behavior*..." seems more the 
means by which such tasks are triggered.
   
   that said, may be too much impl. detail to mention `DagTask` in the javadoc, 
since I don't see it actually used in the code here



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+  public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode);
+
+  public boolean hasRunningJobs(String dagId);
+
+  public void removeDagActionFromStore(DagManager.DagId dagId, 
DagActionStore.FlowActionType flowActionType) throws IOException;
+
+  public void addDagSLA(String dagId, Long flowSla);
+
+  public Long getDagSLA(String dagId);
+
+  public Dag<JobExecutionPlan> getDag(String dagId);
+
+  public LinkedList<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) 
throws IOException;
+
+  public boolean addFailedDagId(String dagId);
+
+  public boolean checkFailedDagId(String dagId);

Review Comment:
   unclear: what is meant by "check" - for membership/existence?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+  private static final long DEFAULT_RETRY_DELAY_MS = 1000;
+
+  private final DagTaskStream dagTaskStream;
+  private final DagProcFactory dagProcFactory;
+  private final DagManagementStateStore dagManagementStateStore;
+  private final ScheduledExecutorService scheduledExecutorPool;
+  private final MetricContext metricContext;
+  private final Optional<EventSubmitter> eventSubmitter;
+  private final Config config;
+  private final Integer numThreads;
+  private final Integer pollingInterval;
+  private final Integer maxRetryAttempts;
+  private final long delayRetryMillis;
+  private final DagProcessingEngine.Thread [] threads;
+
+
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore, 
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+    this.config = config;
+    this.dagTaskStream = dagTaskStream;
+    this.dagProcFactory = dagProcFactory;
+    this.dagManagementStateStore = dagManagementStateStore;
+    metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+
+    this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+    this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.maxRetryAttempts = ConfigUtils.getInt(config, MAX_RETRY_ATTEMPTS, 
DEFAULT_MAX_RETRY_ATTEMPTS);
+    this.delayRetryMillis = ConfigUtils.getLong(config, RETRY_DELAY_MS, 
DEFAULT_RETRY_DELAY_MS);
+    this.threads = new DagProcessingEngine.Thread[this.numThreads];
+    for(int i=0; i < this.numThreads; i++) {
+      Thread thread = new Thread(dagTaskStream, dagProcFactory, 
dagManagementStateStore, eventSubmitter, this.maxRetryAttempts, 
this.delayRetryMillis);
+      this.threads[i] = thread;
+    }
+  }
+
+
+  @AllArgsConstructor
+  private static class Thread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+    private Optional<EventSubmitter> eventSubmitter;
+    private Integer maxRetryAttempts;
+    private long delayRetryMillis;

Review Comment:
   `final`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+  private static final long DEFAULT_RETRY_DELAY_MS = 1000;
+
+  private final DagTaskStream dagTaskStream;
+  private final DagProcFactory dagProcFactory;
+  private final DagManagementStateStore dagManagementStateStore;
+  private final ScheduledExecutorService scheduledExecutorPool;
+  private final MetricContext metricContext;
+  private final Optional<EventSubmitter> eventSubmitter;
+  private final Config config;
+  private final Integer numThreads;
+  private final Integer pollingInterval;
+  private final Integer maxRetryAttempts;
+  private final long delayRetryMillis;
+  private final DagProcessingEngine.Thread [] threads;
+
+
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore, 
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+    this.config = config;
+    this.dagTaskStream = dagTaskStream;
+    this.dagProcFactory = dagProcFactory;
+    this.dagManagementStateStore = dagManagementStateStore;
+    metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+
+    this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+    this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.maxRetryAttempts = ConfigUtils.getInt(config, MAX_RETRY_ATTEMPTS, 
DEFAULT_MAX_RETRY_ATTEMPTS);
+    this.delayRetryMillis = ConfigUtils.getLong(config, RETRY_DELAY_MS, 
DEFAULT_RETRY_DELAY_MS);
+    this.threads = new DagProcessingEngine.Thread[this.numThreads];
+    for(int i=0; i < this.numThreads; i++) {
+      Thread thread = new Thread(dagTaskStream, dagProcFactory, 
dagManagementStateStore, eventSubmitter, this.maxRetryAttempts, 
this.delayRetryMillis);
+      this.threads[i] = thread;
+    }
+  }
+
+
+  @AllArgsConstructor
+  private static class Thread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+    private Optional<EventSubmitter> eventSubmitter;
+    private Integer maxRetryAttempts;
+    private long delayRetryMillis;
+
+    @Override
+    public void run() {
+      try {
+
+        for(DagTask dagTask : dagTaskStream) {

Review Comment:
   nit: extra newline, but missing space between `for (`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+
+
+/**
+ * Interface defining {@link DagTask} based on the type of visitor.
+ * @param <DagProc>
+ */
+@Alpha
+public interface DagTaskVisitor<DagProc> {

Review Comment:
   nit: convention would be `T` (or at the most `DagProcType`).  naming it 
after an actual type (even though not imported) is quite confusing



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+  private static final long DEFAULT_RETRY_DELAY_MS = 1000;
+
+  private final DagTaskStream dagTaskStream;
+  private final DagProcFactory dagProcFactory;
+  private final DagManagementStateStore dagManagementStateStore;
+  private final ScheduledExecutorService scheduledExecutorPool;
+  private final MetricContext metricContext;
+  private final Optional<EventSubmitter> eventSubmitter;
+  private final Config config;
+  private final Integer numThreads;
+  private final Integer pollingInterval;
+  private final Integer maxRetryAttempts;
+  private final long delayRetryMillis;
+  private final DagProcessingEngine.Thread [] threads;
+
+
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore, 
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+    this.config = config;
+    this.dagTaskStream = dagTaskStream;
+    this.dagProcFactory = dagProcFactory;
+    this.dagManagementStateStore = dagManagementStateStore;
+    metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+
+    this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+    this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.maxRetryAttempts = ConfigUtils.getInt(config, MAX_RETRY_ATTEMPTS, 
DEFAULT_MAX_RETRY_ATTEMPTS);
+    this.delayRetryMillis = ConfigUtils.getLong(config, RETRY_DELAY_MS, 
DEFAULT_RETRY_DELAY_MS);
+    this.threads = new DagProcessingEngine.Thread[this.numThreads];
+    for(int i=0; i < this.numThreads; i++) {
+      Thread thread = new Thread(dagTaskStream, dagProcFactory, 
dagManagementStateStore, eventSubmitter, this.maxRetryAttempts, 
this.delayRetryMillis);
+      this.threads[i] = thread;
+    }
+  }
+
+
+  @AllArgsConstructor
+  private static class Thread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+    private Optional<EventSubmitter> eventSubmitter;
+    private Integer maxRetryAttempts;
+    private long delayRetryMillis;
+
+    @Override
+    public void run() {
+      try {
+
+        for(DagTask dagTask : dagTaskStream) {
+            DagProc dagProc = dagTask.host(dagProcFactory);
+            dagProc.process(dagManagementStateStore, eventSubmitter.get(), 
maxRetryAttempts, delayRetryMillis);
+            //marks lease success and releases it
+            dagTaskStream.complete(dagTask);

Review Comment:
   first off, I absolutely LOVE how simple this impl is--even as it segregates 
responsibility elsewhere for deciding **how** to process each specific task 
type.
   
   that said, I wonder whether we might go farther an encapsulate the 
`complete`/commit.
   
   e.g. the stream could construct each task w/ a:
   ```
   @FunctionalInterface
   interface Committer {
     void commit();
   }
   ```
   that would internally store the lease.  the `DagProcFactory` would then take 
that from the task and preserve it in the proc, so the last step of the proc's 
`process` could call it



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -108,7 +108,7 @@ static DagManager.DagId generateDagId(String flowGroup, 
String flowName, long fl
     return generateDagId(flowGroup, flowName, String.valueOf(flowExecutionId));
   }
 
-  static DagManager.DagId generateDagId(String flowGroup, String flowName, 
String flowExecutionId) {
+  public static DagManager.DagId generateDagId(String flowGroup, String 
flowName, String flowExecutionId) {

Review Comment:
   I see... if already widely used, no need to rip out a method you did not 
add.  but if only once or twice, I suggest getting rid of it.
   
   contrary to your reply, I do not observe this impl converting between 
`String` and `long`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * A {@link DagTask} responsible to handle kill tasks.
+ */
+@Alpha
+public class KillDagTask extends DagTask {
+
+  @Getter
+  private final DagManager.DagId killDagId;
+  protected final DagActionStore.DagAction killAction;
+
+  public KillDagTask(DagActionStore.DagAction killAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+
+    this.killAction = killAction;
+    this.leaseObtainedStatusStatus = leaseObtainedStatus;

Review Comment:
   should this be initialized by a call to `super()`?
   



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+  abstract protected S initialize(DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, IOException;
+  abstract protected R act(S state, DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, Exception;
+  abstract protected void sendNotification(R result, EventSubmitter 
eventSubmitter) throws MaybeRetryableException, IOException;
+
+  public final void process(DagManagementStateStore dagManagementStateStore, 
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
+    try {
+      S state = this.initializeWithRetries(dagManagementStateStore, 
maxRetryCount, delayRetryMillis);
+      R result = this.actWithRetries(state, dagManagementStateStore, 
maxRetryCount, delayRetryMillis); // may be pass state store too here
+      this.sendNotificationWithRetries(result, eventSubmitter, maxRetryCount, 
delayRetryMillis);
+      log.info("Successfully processed Dag Request");
+    } catch (Exception ex) {
+      throw new RuntimeException("Cannot process Dag Request: ", ex);

Review Comment:
   I use checked exceptions sparingly, but here, one may actually make sense 
(rather than `RuntimeException`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+  public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode);
+
+  public boolean hasRunningJobs(String dagId);
+
+  public void removeDagActionFromStore(DagManager.DagId dagId, 
DagActionStore.FlowActionType flowActionType) throws IOException;
+
+  public void addDagSLA(String dagId, Long flowSla);
+
+  public Long getDagSLA(String dagId);

Review Comment:
   don't we have two kinds?  also, since this has never been an SLA, but rather 
a **deadline**, would now be an opportunity to adopt the accurate nomenclature?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";

Review Comment:
   I agree w/ a key, like 'xyz.enabled'... but don't believe that should do 
double duty as the prefix for other configs.  let's define `PREFIX = "xyz"` and 
then have `PREFIX + ".enabled"` merely be a stand-alone key



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+  abstract protected S initialize(DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, IOException;
+  abstract protected R act(S state, DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, Exception;
+  abstract protected void sendNotification(R result, EventSubmitter 
eventSubmitter) throws MaybeRetryableException, IOException;
+
+  public final void process(DagManagementStateStore dagManagementStateStore, 
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {

Review Comment:
   eventually, retries may not be governed by a static count, but rather 
tracking to how much time remains in the lease



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+  private static final long DEFAULT_RETRY_DELAY_MS = 1000;
+
+  private final DagTaskStream dagTaskStream;
+  private final DagProcFactory dagProcFactory;
+  private final DagManagementStateStore dagManagementStateStore;
+  private final ScheduledExecutorService scheduledExecutorPool;
+  private final MetricContext metricContext;
+  private final Optional<EventSubmitter> eventSubmitter;
+  private final Config config;
+  private final Integer numThreads;
+  private final Integer pollingInterval;
+  private final Integer maxRetryAttempts;
+  private final long delayRetryMillis;
+  private final DagProcessingEngine.Thread [] threads;
+
+
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore, 
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+    this.config = config;
+    this.dagTaskStream = dagTaskStream;
+    this.dagProcFactory = dagProcFactory;
+    this.dagManagementStateStore = dagManagementStateStore;
+    metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+
+    this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+    this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.maxRetryAttempts = ConfigUtils.getInt(config, MAX_RETRY_ATTEMPTS, 
DEFAULT_MAX_RETRY_ATTEMPTS);
+    this.delayRetryMillis = ConfigUtils.getLong(config, RETRY_DELAY_MS, 
DEFAULT_RETRY_DELAY_MS);
+    this.threads = new DagProcessingEngine.Thread[this.numThreads];
+    for(int i=0; i < this.numThreads; i++) {
+      Thread thread = new Thread(dagTaskStream, dagProcFactory, 
dagManagementStateStore, eventSubmitter, this.maxRetryAttempts, 
this.delayRetryMillis);
+      this.threads[i] = thread;
+    }
+  }
+
+
+  @AllArgsConstructor
+  private static class Thread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+    private Optional<EventSubmitter> eventSubmitter;
+    private Integer maxRetryAttempts;
+    private long delayRetryMillis;
+
+    @Override
+    public void run() {
+      try {
+
+        for(DagTask dagTask : dagTaskStream) {
+            DagProc dagProc = dagTask.host(dagProcFactory);
+            dagProc.process(dagManagementStateStore, eventSubmitter.get(), 
maxRetryAttempts, delayRetryMillis);
+            //marks lease success and releases it
+            dagTaskStream.complete(dagTask);
+        }
+      } catch (Exception ex) {
+        //TODO: need to handle exceptions gracefully
+        log.error(String.format("Exception encountered in %s", 
getClass().getName()), ex);
+        throw new RuntimeException(ex);
+      }

Review Comment:
   yes, BIG TIME TODO!  (RN, looks like just one would crash this thread!)
   
   suggestion: borrow from the current `DagManagerThread` handling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to