umustafi commented on code in PR #3825:
URL: https://github.com/apache/gobblin/pull/3825#discussion_r1426054783


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions
+ */
+public interface DagManagementStateStore {
+
+//  public void addDag(Dag<JobExecutionPlan> dag);
+
+  public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode);
+
+  public boolean hasRunningJobs(String dagId);
+
+  public void removeDagActionFromStore(DagActionStore.DagAction dagAction) 
throws IOException;
+

Review Comment:
   put TODO for the commented out methods?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
+import org.apache.gobblin.service.modules.orchestration.DagStateStore;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<T extends DagTask> {
+  protected final DagProcFactory dagProcFactory;
+  protected final UserQuotaManager quotaManager;

Review Comment:
   Should checking quota happen in DagProc or in TaskStream (dagProc needs 
dagNode to check). 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManager.java:
##########
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.*;
+
+
+/**
+ * NewDagManager manages dags in memory and various mappings.
+ */
+@Slf4j
+public class NewDagManager implements DagManagement {
+  public static final String DAG_MANAGER_PREFIX = 
"gobblin.service.dagManager.";
+
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;

Review Comment:
   Is this copy of NewDagManager needed or can existing DagManager be re-used? 
What are the key differences here? The main change we want to make is that we 
have an abstraction around obtaining/reading dags through the 
`DagManagementStateStore` to interchangeably use in memory versus mysql dag 
storage right. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/AdvanceDagProc.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+
+
+/**
+ * An implementation of {@link DagProc} dealing with advancing to the next 
node in the {@link Dag}.
+ * This Dag Procedure will deal with pending Job statuses such as: PENDING, 
PENDING_RESUME, PENDING_RETRY
+ * as well jobs that have reached an end state with statuses such as: 
COMPLETED, FAILED and CANCELLED.
+ * Primarily, it will be responsible for polling the flow and job statuses and 
advancing to the next node in the dag.
+ *
+ */
+@Slf4j
+@Alpha
+public class AdvanceDagProc extends DagProc<AdvanceDagTask> {
+  private AdvanceDagTask advanceDagTask;
+  private Optional<Dag<JobExecutionPlan>> dagToAdvance;
+
+
+  public AdvanceDagProc(AdvanceDagTask advanceDagTask, DagProcFactory 
dagProcFactory) {
+    super(dagProcFactory);
+    this.advanceDagTask = advanceDagTask;
+  }
+
+  @Override
+  protected void initialize() throws IOException {
+    this.dagToAdvance = 
dagManagementStateStore.getDag(this.advanceDagTask.getAdvanceDagId().toString());
+  }
+
+  @Override
+  protected void act() {
+    if (!this.dagToAdvance.isPresent()) {
+      log.warn("No dag with id " + this.advanceDagTask.getAdvanceDagId() + " 
found to advance");
+      return;
+    }
+    // todo - find next dag node to run
+    Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(new 
JobExecutionPlan(JobSpec.builder().build(), null));

Review Comment:
   We should use dagId to retrieve DagNode from DagManagementStateStore (the 
next dagNode without a status)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManager.java:
##########
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.*;
+
+
+/**
+ * NewDagManager manages dags in memory and various mappings.
+ */
+@Slf4j
+public class NewDagManager implements DagManagement {
+  public static final String DAG_MANAGER_PREFIX = 
"gobblin.service.dagManager.";
+
+  private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+  public static final Integer DEFAULT_NUM_THREADS = 3;
+  private static final Integer TERMINATION_TIMEOUT = 30;
+  public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + 
"numThreads";
+  public static final String JOB_STATUS_POLLING_INTERVAL_KEY = 
DAG_MANAGER_PREFIX + "pollingInterval";
+  private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + 
"dagStateStoreClass";
+  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
+  private static final String FAILED_DAG_RETENTION_TIME_UNIT = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.timeUnit";
+  private static final String DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT = "DAYS";
+  private static final String FAILED_DAG_RETENTION_TIME = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.time";
+  private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
+  // Re-emit the final flow status if not detected within 5 minutes
+  public static final String FAILED_DAG_POLLING_INTERVAL = 
FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
+  public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
+  public static final String DAG_MANAGER_HEARTBEAT = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + 
"dagManager.heartbeat-%s";
+  // Default job start SLA time if configured, measured in minutes. Default is 
10 minutes
+  private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
+  private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + 
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
+  private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
+  private final Config config;
+  private final Integer retentionPollingInterval;
+
+  public static String getFAILED_DAG_STATESTORE_PREFIX() {
+    return NewDagManager.FAILED_DAG_STATESTORE_PREFIX;
+  }
+
+  public void addDag(String dagId, Dag<JobExecutionPlan> dag) {
+    // TODO : implement it, get some code from old dag manager
+  }
+
+
+  public Map<String, Dag<JobExecutionPlan>> getDags() {
+    return this.dags;
+  }
+
+  public Map<String, Dag<JobExecutionPlan>> getResumingDags() {
+    return this.resumingDags;
+  }
+
+  public Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
getJobToDag() {
+    return this.jobToDag;
+  }
+
+  public Map<String, Dag.DagNode<JobExecutionPlan>> getDagNodes() {
+    return this.dagNodes;
+  }
+
+  public Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> getDagToJobs() 
{
+    return this.dagToJobs;
+  }
+
+  public Map<String, Long> getDagToSLA() {
+    return this.dagToSLA;
+  }
+
+  public Set<String> getFailedDagIds() {
+    return this.failedDagIds;
+  }
+
+  public DagStateStore getFailedDagStateStore() {
+    return this.failedDagStateStore;
+  }
+
+  public DagStateStore getDagStateStore() {
+    return this.dagStateStore;
+  }
+
+  public Integer getNumThreads() {
+    return this.numThreads;
+  }
+
+  public JobStatusRetriever getJobStatusRetriever() {
+    return this.jobStatusRetriever;
+  }
+
+  public UserQuotaManager getQuotaManager() {
+    return this.quotaManager;
+  }
+
+  public Optional<Timer> getJobStatusPolledTimer() {
+    return this.jobStatusPolledTimer;
+  }
+
+  public Optional<EventSubmitter> getEventSubmitter() {
+    return this.eventSubmitter;
+  }
+
+  public DagManagerMetrics getDagManagerMetrics() {
+    return this.dagManagerMetrics;
+  }
+
+  public AtomicLong getOrchestrationDelay() {
+    return this.orchestrationDelay;
+  }
+
+  public DagProcessingEngine getDagProcessingEngine() {
+    return this.dagProcessingEngine;
+  }
+
+  public Optional<DagActionStore> getDagActionStore() {
+    return this.dagActionStore;
+  }
+
+  /**
+   * Action to be performed on a {@link Dag}, in case of a job failure. 
Currently, we allow 2 modes:
+   * <ul>
+   *   <li> FINISH_RUNNING, which allows currently running jobs to finish.</li>
+   *   <li> FINISH_ALL_POSSIBLE, which allows every possible job in the Dag to 
finish, as long as all the dependencies
+   *   of the job are successful.</li>
+   * </ul>
+   */
+  public enum FailureOption {
+    FINISH_RUNNING("FINISH_RUNNING"),
+    CANCEL("CANCEL"),
+    FINISH_ALL_POSSIBLE("FINISH_ALL_POSSIBLE");
+
+    private final String failureOption;
+
+    FailureOption(final String failureOption) {
+      this.failureOption = failureOption;
+    }
+
+    @Override
+    public String toString() {
+      return this.failureOption;
+    }
+  }
+
+  private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
+  private final Map<String, Dag<JobExecutionPlan>> resumingDags = new 
HashMap<>();
+  private static final long DAG_FLOW_STATUS_TOLERANCE_TIME_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
+
+
+  private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> 
jobToDag = new HashMap<>();
+  private final Map<String, Dag.DagNode<JobExecutionPlan>> dagNodes = new 
HashMap<>();
+  private final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> 
dagToJobs = new HashMap<>();
+  final Map<String, Long> dagToSLA = new HashMap<>();
+  DagManager.DagManagerThread[] dagManagerThreads;

Review Comment:
   `DagManagement` has api to use `DagManagementStateStore` and we should 
remove these in-memory references



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -380,11 +385,23 @@ public void remove(Spec spec, Properties headers) throws 
IOException {
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all 
executions
     if (spec instanceof FlowSpec) {
+      URI specUri = spec.getUri();
       //Send the dag to the DagManager to stop it.
       //Also send it to the SpecProducer to do any cleanup tasks on 
SpecExecutor.
       if (this.dagManager.isPresent()) {
-        _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
-        this.dagManager.get().stopDag(spec.getUri());
+        _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
specUri);
+        this.dagManager.get().stopDag(specUri);
+      }
+      if (this.dagProcessingEngine.isPresent()) {
+        String flowGroup = FlowSpec.Utils.getFlowGroup(specUri);
+        String flowName = FlowSpec.Utils.getFlowName(specUri);
+        List<Long> flowExecutionIds = 
this.newDagManager.get().getJobStatusRetriever().getLatestExecutionIdsForFlow
+            (flowName, flowGroup, 10);
+        _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+        for (long flowExecutionId : flowExecutionIds) {
+          DagActionStore.DagAction dagAction = 
DagManagerUtils.createDagAction(flowGroup, flowName, 
String.valueOf(flowExecutionId), DagActionStore.FlowActionType.KILL);
+          this.dagProcessingEngine.get().addDagAction(dagAction);

Review Comment:
   At this point we should also/instead add a line to store the new dag action 
in dagAction store so we can receive this in changeMonitor across all hosts. 
Remember we want all actions persisted in MySQL and then coordination across 
hosts.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+
+import com.google.common.base.Optional;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Holds a stream of {@link DagTask}s that {@link DagProcessingEngine} would 
pull from for processing.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+// change to iterable
+public class DagTaskStream implements Iterator<DagTask>{
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue;
+  //private FlowTriggerHandler flowTriggerHandler;
+  private final DagManagementStateStore dagManagementStateStore;
+
+  @Override
+  public boolean hasNext() {
+    return !this.dagActionQueue.isEmpty();
+  }
+
+  @Override
+  public DagTask next() {
+    DagActionStore.DagAction dagAction = this.dagActionQueue.poll();
+    try {
+      // todo reconsider the use of MultiActiveLeaseArbiter
+      //MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction);
+      // todo - uncomment after flow trigger handler provides such an api
+      //Properties jobProps = getJobProperties(dagAction);
+      //flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+      //if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+        // can it return null? is this iterator allowed to return null?
+        return createDagTask(dagAction, new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, 
System.currentTimeMillis()));

Review Comment:
   For now we are spoofing getting `LeaseObtainedStatus` since there is no 
handler at the moment, but we should have a while loop that continues polling 
until we get a DagTask that we obtain lease for. Let's still encode that logic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to