[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=882974&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-882974
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Oct/23 18:27
            Start Date: 02/Oct/23 18:27
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1342955936


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states

Review Comment:
   nit: strike "An interface to provide"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios 
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+  /**
+   * Currently, it is handling just the launch of a {@link Dag} request via 
REST client for adhoc flows.
+   * The eventTimestamp for adhoc flows will always be 0 (zero), while -1 
would indicate failures.
+   * Essentially for a valid launch flow, the eventTimestamp needs to be >= 0.
+   * Future implementations will cover launch of flows through the scheduler 
too!

Review Comment:
   rather than documenting this to be merely informative, could we instead 
rephrase as an interface req/policy? 
   
   (overall, I'm uncertain on your overall plan for the impl's validation, so 
this is based solely on reading the javadoc in isolation...)
   
   e.g. throws IllegalArgException when `eventTimestamp < 0`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.CleanUpDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.ResumeDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given 
{@link DagTask}.
+ */
+
+@Alpha
+@Slf4j
+public class DagProcFactory implements DagTaskVisitor<DagProc> {
+
+  private DagManagementStateStore dagManagementStateStore;
+  private JobStatusRetriever jobStatusRetriever;
+  private FlowStatusGenerator flowStatusGenerator;
+  private UserQuotaManager quotaManager;
+  private SpecCompiler specCompiler;
+  private FlowCatalog flowCatalog;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private Config config;
+  private Optional<EventSubmitter> eventSubmitter;
+  private boolean instrumentationEnabled;
+
+  public DagProcFactory(Config config, DagManagementStateStore 
dagManagementStateStore, JobStatusRetriever jobStatusRetriever,
+      FlowStatusGenerator flowStatusGenerator, UserQuotaManager quotaManager, 
SpecCompiler specCompiler, FlowCatalog flowCatalog, 
FlowCompilationValidationHelper flowCompilationValidationHelper, boolean 
instrumentationEnabled) {
+
+    this.config = config;
+    this.dagManagementStateStore = dagManagementStateStore;
+    this.jobStatusRetriever = jobStatusRetriever;
+    this.flowStatusGenerator = flowStatusGenerator;
+    this.quotaManager = quotaManager;
+    this.specCompiler = specCompiler;
+    this.flowCatalog = flowCatalog;
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
+    this.instrumentationEnabled = instrumentationEnabled;

Review Comment:
   these look like they could be `private final`... is that true?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios 
for launch, resume, kill, job start
+ * and flow completion deadlines

Review Comment:
   rather than "responsible for *defining the behavior*..." seems more the 
means by which such tasks are triggered.
   
   that said, may be too much impl. detail to mention `DagTask` in the javadoc, 
since I don't see it actually used in the code here



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+  public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode);
+
+  public boolean hasRunningJobs(String dagId);
+
+  public void removeDagActionFromStore(DagManager.DagId dagId, 
DagActionStore.FlowActionType flowActionType) throws IOException;
+
+  public void addDagSLA(String dagId, Long flowSla);
+
+  public Long getDagSLA(String dagId);
+
+  public Dag<JobExecutionPlan> getDag(String dagId);
+
+  public LinkedList<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) 
throws IOException;
+
+  public boolean addFailedDagId(String dagId);
+
+  public boolean checkFailedDagId(String dagId);

Review Comment:
   unclear: what is meant by "check" - for membership/existence?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+  private static final long DEFAULT_RETRY_DELAY_MS = 1000;
+
+  private final DagTaskStream dagTaskStream;
+  private final DagProcFactory dagProcFactory;
+  private final DagManagementStateStore dagManagementStateStore;
+  private final ScheduledExecutorService scheduledExecutorPool;
+  private final MetricContext metricContext;
+  private final Optional<EventSubmitter> eventSubmitter;
+  private final Config config;
+  private final Integer numThreads;
+  private final Integer pollingInterval;
+  private final Integer maxRetryAttempts;
+  private final long delayRetryMillis;
+  private final DagProcessingEngine.Thread [] threads;
+
+
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore, 
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+    this.config = config;
+    this.dagTaskStream = dagTaskStream;
+    this.dagProcFactory = dagProcFactory;
+    this.dagManagementStateStore = dagManagementStateStore;
+    metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+
+    this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+    this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.maxRetryAttempts = ConfigUtils.getInt(config, MAX_RETRY_ATTEMPTS, 
DEFAULT_MAX_RETRY_ATTEMPTS);
+    this.delayRetryMillis = ConfigUtils.getLong(config, RETRY_DELAY_MS, 
DEFAULT_RETRY_DELAY_MS);
+    this.threads = new DagProcessingEngine.Thread[this.numThreads];
+    for(int i=0; i < this.numThreads; i++) {
+      Thread thread = new Thread(dagTaskStream, dagProcFactory, 
dagManagementStateStore, eventSubmitter, this.maxRetryAttempts, 
this.delayRetryMillis);
+      this.threads[i] = thread;
+    }
+  }
+
+
+  @AllArgsConstructor
+  private static class Thread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+    private Optional<EventSubmitter> eventSubmitter;
+    private Integer maxRetryAttempts;
+    private long delayRetryMillis;

Review Comment:
   `final`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+  private static final long DEFAULT_RETRY_DELAY_MS = 1000;
+
+  private final DagTaskStream dagTaskStream;
+  private final DagProcFactory dagProcFactory;
+  private final DagManagementStateStore dagManagementStateStore;
+  private final ScheduledExecutorService scheduledExecutorPool;
+  private final MetricContext metricContext;
+  private final Optional<EventSubmitter> eventSubmitter;
+  private final Config config;
+  private final Integer numThreads;
+  private final Integer pollingInterval;
+  private final Integer maxRetryAttempts;
+  private final long delayRetryMillis;
+  private final DagProcessingEngine.Thread [] threads;
+
+
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore, 
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+    this.config = config;
+    this.dagTaskStream = dagTaskStream;
+    this.dagProcFactory = dagProcFactory;
+    this.dagManagementStateStore = dagManagementStateStore;
+    metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+
+    this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+    this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.maxRetryAttempts = ConfigUtils.getInt(config, MAX_RETRY_ATTEMPTS, 
DEFAULT_MAX_RETRY_ATTEMPTS);
+    this.delayRetryMillis = ConfigUtils.getLong(config, RETRY_DELAY_MS, 
DEFAULT_RETRY_DELAY_MS);
+    this.threads = new DagProcessingEngine.Thread[this.numThreads];
+    for(int i=0; i < this.numThreads; i++) {
+      Thread thread = new Thread(dagTaskStream, dagProcFactory, 
dagManagementStateStore, eventSubmitter, this.maxRetryAttempts, 
this.delayRetryMillis);
+      this.threads[i] = thread;
+    }
+  }
+
+
+  @AllArgsConstructor
+  private static class Thread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+    private Optional<EventSubmitter> eventSubmitter;
+    private Integer maxRetryAttempts;
+    private long delayRetryMillis;
+
+    @Override
+    public void run() {
+      try {
+
+        for(DagTask dagTask : dagTaskStream) {

Review Comment:
   nit: extra newline, but missing space between `for (`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+
+
+/**
+ * Interface defining {@link DagTask} based on the type of visitor.
+ * @param <DagProc>
+ */
+@Alpha
+public interface DagTaskVisitor<DagProc> {

Review Comment:
   nit: convention would be `T` (or at the most `DagProcType`).  naming it 
after an actual type (even though not imported) is quite confusing



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+  private static final long DEFAULT_RETRY_DELAY_MS = 1000;
+
+  private final DagTaskStream dagTaskStream;
+  private final DagProcFactory dagProcFactory;
+  private final DagManagementStateStore dagManagementStateStore;
+  private final ScheduledExecutorService scheduledExecutorPool;
+  private final MetricContext metricContext;
+  private final Optional<EventSubmitter> eventSubmitter;
+  private final Config config;
+  private final Integer numThreads;
+  private final Integer pollingInterval;
+  private final Integer maxRetryAttempts;
+  private final long delayRetryMillis;
+  private final DagProcessingEngine.Thread [] threads;
+
+
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore, 
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+    this.config = config;
+    this.dagTaskStream = dagTaskStream;
+    this.dagProcFactory = dagProcFactory;
+    this.dagManagementStateStore = dagManagementStateStore;
+    metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+
+    this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+    this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.maxRetryAttempts = ConfigUtils.getInt(config, MAX_RETRY_ATTEMPTS, 
DEFAULT_MAX_RETRY_ATTEMPTS);
+    this.delayRetryMillis = ConfigUtils.getLong(config, RETRY_DELAY_MS, 
DEFAULT_RETRY_DELAY_MS);
+    this.threads = new DagProcessingEngine.Thread[this.numThreads];
+    for(int i=0; i < this.numThreads; i++) {
+      Thread thread = new Thread(dagTaskStream, dagProcFactory, 
dagManagementStateStore, eventSubmitter, this.maxRetryAttempts, 
this.delayRetryMillis);
+      this.threads[i] = thread;
+    }
+  }
+
+
+  @AllArgsConstructor
+  private static class Thread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+    private Optional<EventSubmitter> eventSubmitter;
+    private Integer maxRetryAttempts;
+    private long delayRetryMillis;
+
+    @Override
+    public void run() {
+      try {
+
+        for(DagTask dagTask : dagTaskStream) {
+            DagProc dagProc = dagTask.host(dagProcFactory);
+            dagProc.process(dagManagementStateStore, eventSubmitter.get(), 
maxRetryAttempts, delayRetryMillis);
+            //marks lease success and releases it
+            dagTaskStream.complete(dagTask);

Review Comment:
   first off, I absolutely LOVE how simple this impl is--even as it segregates 
responsibility elsewhere for deciding **how** to process each specific task 
type.
   
   that said, I wonder whether we might go farther an encapsulate the 
`complete`/commit.
   
   e.g. the stream could construct each task w/ a:
   ```
   @FunctionalInterface
   interface Committer {
     void commit();
   }
   ```
   that would internally store the lease.  the `DagProcFactory` would then take 
that from the task and preserve it in the proc, so the last step of the proc's 
`process` could call it



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -108,7 +108,7 @@ static DagManager.DagId generateDagId(String flowGroup, 
String flowName, long fl
     return generateDagId(flowGroup, flowName, String.valueOf(flowExecutionId));
   }
 
-  static DagManager.DagId generateDagId(String flowGroup, String flowName, 
String flowExecutionId) {
+  public static DagManager.DagId generateDagId(String flowGroup, String 
flowName, String flowExecutionId) {

Review Comment:
   I see... if already widely used, no need to rip out a method you did not 
add.  but if only once or twice, I suggest getting rid of it.
   
   contrary to your reply, I do not observe this impl converting between 
`String` and `long`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * A {@link DagTask} responsible to handle kill tasks.
+ */
+@Alpha
+public class KillDagTask extends DagTask {
+
+  @Getter
+  private final DagManager.DagId killDagId;
+  protected final DagActionStore.DagAction killAction;
+
+  public KillDagTask(DagActionStore.DagAction killAction, 
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+
+    this.killAction = killAction;
+    this.leaseObtainedStatusStatus = leaseObtainedStatus;

Review Comment:
   should this be initialized by a call to `super()`?
   



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+  abstract protected S initialize(DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, IOException;
+  abstract protected R act(S state, DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, Exception;
+  abstract protected void sendNotification(R result, EventSubmitter 
eventSubmitter) throws MaybeRetryableException, IOException;
+
+  public final void process(DagManagementStateStore dagManagementStateStore, 
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
+    try {
+      S state = this.initializeWithRetries(dagManagementStateStore, 
maxRetryCount, delayRetryMillis);
+      R result = this.actWithRetries(state, dagManagementStateStore, 
maxRetryCount, delayRetryMillis); // may be pass state store too here
+      this.sendNotificationWithRetries(result, eventSubmitter, maxRetryCount, 
delayRetryMillis);
+      log.info("Successfully processed Dag Request");
+    } catch (Exception ex) {
+      throw new RuntimeException("Cannot process Dag Request: ", ex);

Review Comment:
   I use checked exceptions sparingly, but here, one may actually make sense 
(rather than `RuntimeException`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+  public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode);
+
+  public boolean hasRunningJobs(String dagId);
+
+  public void removeDagActionFromStore(DagManager.DagId dagId, 
DagActionStore.FlowActionType flowActionType) throws IOException;
+
+  public void addDagSLA(String dagId, Long flowSla);
+
+  public Long getDagSLA(String dagId);

Review Comment:
   don't we have two kinds?  also, since this has never been an SLA, but rather 
a **deadline**, would now be an opportunity to adopt the accurate nomenclature?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";

Review Comment:
   I agree w/ a key, like 'xyz.enabled'... but don't believe that should do 
double duty as the prefix for other configs.  let's define `PREFIX = "xyz"` and 
then have `PREFIX + ".enabled"` merely be a stand-alone key



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+  abstract protected S initialize(DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, IOException;
+  abstract protected R act(S state, DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, Exception;
+  abstract protected void sendNotification(R result, EventSubmitter 
eventSubmitter) throws MaybeRetryableException, IOException;
+
+  public final void process(DagManagementStateStore dagManagementStateStore, 
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {

Review Comment:
   eventually, retries may not be governed by a static count, but rather 
tracking to how much time remains in the lease



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and 
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc} 
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link 
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+  public static final String NUM_THREADS_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+  public static final String MAX_RETRY_ATTEMPTS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+  public static final String RETRY_DELAY_MS = 
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+  private static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+  private static final long DEFAULT_RETRY_DELAY_MS = 1000;
+
+  private final DagTaskStream dagTaskStream;
+  private final DagProcFactory dagProcFactory;
+  private final DagManagementStateStore dagManagementStateStore;
+  private final ScheduledExecutorService scheduledExecutorPool;
+  private final MetricContext metricContext;
+  private final Optional<EventSubmitter> eventSubmitter;
+  private final Config config;
+  private final Integer numThreads;
+  private final Integer pollingInterval;
+  private final Integer maxRetryAttempts;
+  private final long delayRetryMillis;
+  private final DagProcessingEngine.Thread [] threads;
+
+
+  public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, 
DagProcFactory dagProcFactory, DagManagementStateStore dagManagementStateStore, 
MultiActiveLeaseArbiter multiActiveLeaseArbiter) {
+    this.config = config;
+    this.dagTaskStream = dagTaskStream;
+    this.dagProcFactory = dagProcFactory;
+    this.dagManagementStateStore = dagManagementStateStore;
+    metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+
+    this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
+    this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
+    this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
+    this.maxRetryAttempts = ConfigUtils.getInt(config, MAX_RETRY_ATTEMPTS, 
DEFAULT_MAX_RETRY_ATTEMPTS);
+    this.delayRetryMillis = ConfigUtils.getLong(config, RETRY_DELAY_MS, 
DEFAULT_RETRY_DELAY_MS);
+    this.threads = new DagProcessingEngine.Thread[this.numThreads];
+    for(int i=0; i < this.numThreads; i++) {
+      Thread thread = new Thread(dagTaskStream, dagProcFactory, 
dagManagementStateStore, eventSubmitter, this.maxRetryAttempts, 
this.delayRetryMillis);
+      this.threads[i] = thread;
+    }
+  }
+
+
+  @AllArgsConstructor
+  private static class Thread implements Runnable {
+
+    private DagTaskStream dagTaskStream;
+    private DagProcFactory dagProcFactory;
+    private DagManagementStateStore dagManagementStateStore;
+    private Optional<EventSubmitter> eventSubmitter;
+    private Integer maxRetryAttempts;
+    private long delayRetryMillis;
+
+    @Override
+    public void run() {
+      try {
+
+        for(DagTask dagTask : dagTaskStream) {
+            DagProc dagProc = dagTask.host(dagProcFactory);
+            dagProc.process(dagManagementStateStore, eventSubmitter.get(), 
maxRetryAttempts, delayRetryMillis);
+            //marks lease success and releases it
+            dagTaskStream.complete(dagTask);
+        }
+      } catch (Exception ex) {
+        //TODO: need to handle exceptions gracefully
+        log.error(String.format("Exception encountered in %s", 
getClass().getName()), ex);
+        throw new RuntimeException(ex);
+      }

Review Comment:
   yes, BIG TIME TODO!  (RN, looks like just one would crash this thread!)
   
   suggestion: borrow from the current `DagManagerThread` handling





Issue Time Tracking
-------------------

    Worklog Id:     (was: 882974)
    Time Spent: 12h  (was: 11h 50m)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 12h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to