umustafi commented on code in PR #3825:
URL: https://github.com/apache/gobblin/pull/3825#discussion_r1427341387


##########
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java:
##########
@@ -108,6 +108,26 @@ public Iterator<JobStatus> 
getLatestJobStatusByFlowNameAndGroup(String flowName,
    * @return deserialize {@link State} into a {@link JobStatus}.
    */
   protected JobStatus getJobStatus(State jobState) {
+    JobStatus.JobStatusBuilder jobStatusBuilder = 
createJobStatusBuilderFromState(jobState);
+
+    String contextId = 
TroubleshooterUtils.getContextIdForJob(jobState.getProperties());
+
+    Supplier<List<Issue>> jobIssues = Suppliers.memoize(() -> {
+      List<Issue> issues;
+      try {
+        issues = issueRepository.getAll(contextId);
+      } catch (TroubleshooterException e) {
+        log.warn("Cannot retrieve job issues", e);
+        issues = Collections.emptyList();
+      }
+      return issues;
+    });
+
+    jobStatusBuilder.issues(jobIssues);
+    return jobStatusBuilder.build();
+  }
+
+  public static JobStatus.JobStatusBuilder 
createJobStatusBuilderFromState(State jobState) {

Review Comment:
   why is this method public when others are protected? Do we need this 
elsewhere?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios 
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap);
+  void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);

Review Comment:
   can u add brief java doc to explain how these are used? I expect these 
methods to be defined in `DagStateStore` and these are a wrapper around those 
and these methods are meant to be used only internally and should be 
private/protected.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -48,25 +53,49 @@
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
 import org.apache.gobblin.util.ConfigUtils;
 
+import static org.apache.gobblin.service.ExecutionStatus.PENDING;
+
 
 public class DagManagerUtils {
   static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
   static String QUOTA_KEY_SEPERATOR = ",";
 
-  static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
+  public static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
     return getFlowId(dag.getStartNodes().get(0));
   }
 
+  public static DagActionStore.DagAction createDagAction(Dag<JobExecutionPlan> 
dag, DagActionStore.FlowActionType flowActionType) {
+    return createDagAction(dag.getStartNodes().get(0), flowActionType);
+  }
+
+  public static DagActionStore.DagAction createDagAction(String flowGroup, 
String flowName, String flowExecutionId, DagActionStore.FlowActionType 
flowActionType) {
+    return new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
flowActionType);
+  }
+
+  // todo - dag action object does not have any identifier to tell if it is 
for a complete dag or just for one dag node

Review Comment:
   Good call out, we discussed adding job level identification to the dag 
action. Perhaps the existence of empty job level identification would indicate 
whether its a complete dag or not



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/KillDagThread.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+@Slf4j
+public class KillDagThread implements Runnable {
+  private final Long defaultJobStartSlaTimeMillis;
+  NewDagManager dagManager;
+  public KillDagThread(Long defaultJobStartSlaTimeMillis, NewDagManager 
newDagManager) {
+    this.defaultJobStartSlaTimeMillis = defaultJobStartSlaTimeMillis;
+    this.dagManager = newDagManager;
+  }
+
+  @Override
+  public void run() {
+    for (Dag.DagNode<JobExecutionPlan> node : 
this.dagManager.getJobToDag().keySet()) {
+      boolean flowKilled = enforceFlowStartDeadline(node);
+      boolean jobKilled = false;
+
+      if (!flowKilled) {
+        JobStatus jobStatus = pollJobStatus(node);
+        try {
+          jobKilled = enforceJobCompletionDeadline(node, jobStatus);
+        } catch (ExecutionException | InterruptedException e) {
+          log.warn("Error getting status for dag node " + node.getId());
+          continue;
+        }
+      }
+
+      if (flowKilled || jobKilled) {
+        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(node);
+        jobExecutionPlan.setExecutionStatus(CANCELLED);
+        try {
+          this.dagManager.onJobFinish(node);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        String dagId = DagManagerUtils.generateDagId(node).toString();
+        this.dagManager.deleteJobState(dagId, node);
+      }
+    }
+  }
+
+  /**
+   * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+   */
+  private JobStatus pollJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName);
+  }
+
+  private JobStatus pollStatus(String flowGroup, String flowName, long 
flowExecutionId, String jobGroup, String jobName) {
+    long pollStartTime = System.nanoTime();
+    Iterator<JobStatus> jobStatusIterator =
+        
this.dagManager.getJobStatusRetriever().getJobStatusesForFlowExecution(flowName,
 flowGroup, flowExecutionId, jobName, jobGroup);
+    Instrumented.updateTimer(this.dagManager.getJobStatusPolledTimer(), 
System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS);
+
+    if (jobStatusIterator.hasNext()) {
+      return jobStatusIterator.next();
+    } else {
+      return null;
+    }
+  }

Review Comment:
   this method seems like it would be used by other action threads too. can we 
put this into a util class?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/KillDagThread.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+@Slf4j
+public class KillDagThread implements Runnable {
+  private final Long defaultJobStartSlaTimeMillis;
+  NewDagManager dagManager;
+  public KillDagThread(Long defaultJobStartSlaTimeMillis, NewDagManager 
newDagManager) {
+    this.defaultJobStartSlaTimeMillis = defaultJobStartSlaTimeMillis;
+    this.dagManager = newDagManager;
+  }
+
+  @Override
+  public void run() {
+    for (Dag.DagNode<JobExecutionPlan> node : 
this.dagManager.getJobToDag().keySet()) {
+      boolean flowKilled = enforceFlowStartDeadline(node);
+      boolean jobKilled = false;
+
+      if (!flowKilled) {
+        JobStatus jobStatus = pollJobStatus(node);
+        try {
+          jobKilled = enforceJobCompletionDeadline(node, jobStatus);

Review Comment:
   do we try to enforce the SLA kills first even when we receive an API 
request? I was not aware this is the norm. Is this not supposed to kill the dag 
because of api request or its used differently?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios 
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap);
+  void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  /**
+   * defines what to do when a job (dag node) finishes
+   * @param dagNode dag node that finished
+   * @return next set of DagNodes to run
+   * @throws IOException
+   */
+  Map<String, Set<Dag.DagNode<JobExecutionPlan>>> 
onJobFinish(Dag.DagNode<JobExecutionPlan> dagNode)
+      throws IOException;
+
+  /**
+   * submit next dag nodes to run
+   * @param dagId dag id for which next dag nodes to run
+   * @return a set of dag nodes that were submitted to run by this method
+   * @throws IOException
+   */
+  // todo : maybe return just a set
+  Map<String, Set<Dag.DagNode<JobExecutionPlan>>> submitNext(String dagId) 
throws IOException;
+  void removeDagActionFromStore(DagManager.DagId dagIdToResume, 
DagActionStore.FlowActionType flowActionType)
+      throws IOException;
+
+  void handleJobStatusEvent(JobStatusEvent jobStatusEvent);
+  void handleKillFlowEvent(KillFlowEvent killFlowEvent);

Review Comment:
   let's be consistent with spacing, this is a bit hard to read. Can you add 
sections for the methods? ie: 
   ```
   // High Level flow actions 
   void handleKillFlowEvent...
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java:
##########
@@ -285,6 +297,32 @@ public boolean equals(Object o) {
     public int hashCode() {
       return this.getValue().hashCode();
     }
+
+    private static DagNodeId createId(Config jobConfig) {
+      String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String flowName =jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      long flowExecutionId = ConfigUtils.getLong(jobConfig, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 0L);
+      String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
+      String jobGroup = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_GROUP_KEY, "");

Review Comment:
   can we codify the defaults used for `jobname/group, execution id` if none 
are provided in ConfigurationKeys? We may need again in other locations. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.CleanUpDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.ReloadDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.ResumeDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.RetryDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ReloadDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.RetryDagTask;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given 
{@link DagTask}.
+ */
+
+@Alpha
+@Slf4j
+public class DagProcFactory implements DagTaskVisitor {
+  // todo - check what all fields are needed by DagProc implementations
+  public NewDagManager dagManager;
+  private JobStatusRetriever jobStatusRetriever;
+  private FlowStatusGenerator flowStatusGenerator;
+  private UserQuotaManager quotaManager;
+  private SpecCompiler specCompiler;
+  private FlowCatalog flowCatalog;
+  private FlowCompilationValidationHelper flowCompilationValidationHelper;
+  private Config config;
+  public final DagProcessingEngine dagProcessingEngine;
+  public DagManagementStateStore dagManagementStateStore;
+
+  Optional<EventSubmitter> eventSubmitter;

Review Comment:
   are these here only to pass onto the individual DagProc classes? can we not 
inject these fields into the classes?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios 
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap);
+  void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  /**
+   * defines what to do when a job (dag node) finishes

Review Comment:
   nit: capitalize first word here and in other docs



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions
+ */
+public interface DagManagementStateStore {
+
+//  public void addDag(Dag<JobExecutionPlan> dag);
+
+  public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode);
+
+  public boolean hasRunningJobs(String dagId);
+
+  public void removeDagActionFromStore(DagActionStore.DagAction dagAction) 
throws IOException;

Review Comment:
   this method is in DagManagement interface as well. Does it make sense here? 
I'd expect it to be called in one of the cases. Here why do we use `dagAction` 
to identify the dag to remove instead of dagId like above. 



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +403,62 @@ static List<String> getDistinctUniqueRequesters(String 
serializedRequesters) {
       throw new RuntimeException("Could not process requesters due to ", e);
     }
   }
+  public static void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag, 
Optional<EventSubmitter> eventSubmitter) {
+    if (eventSubmitter.isPresent()) {
+      for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+        
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+        jobExecutionPlan.setExecutionStatus(PENDING);

Review Comment:
   why do we only submit pending status events? can we codify that into the 
method java doc?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+
+import com.google.common.base.Optional;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Holds a stream of {@link DagTask}s that {@link DagProcessingEngine} would 
pull from for processing.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+// change to iterable
+public class DagTaskStream implements Iterator<DagTask>{

Review Comment:
   where does it implement `DagManagement`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions
+ */
+public interface DagManagementStateStore {
+
+//  public void addDag(Dag<JobExecutionPlan> dag);
+
+  public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> 
dagNode);
+
+  public boolean hasRunningJobs(String dagId);
+
+  public void removeDagActionFromStore(DagActionStore.DagAction dagAction) 
throws IOException;

Review Comment:
   I'm not understanding just by reading this class and the DagManagement 
interface how the responsibility is differentiated and how the two interact. 
Can you add detail to the java docs?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+
+import com.google.common.base.Optional;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Holds a stream of {@link DagTask}s that {@link DagProcessingEngine} would 
pull from for processing.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+// change to iterable
+public class DagTaskStream implements Iterator<DagTask>{
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue;
+  //private FlowTriggerHandler flowTriggerHandler;
+  private final DagManagementStateStore dagManagementStateStore;
+
+  @Override
+  public boolean hasNext() {
+    return !this.dagActionQueue.isEmpty();
+  }
+
+  @Override
+  public DagTask next() {
+    DagActionStore.DagAction dagAction = this.dagActionQueue.poll();
+    try {
+      // todo reconsider the use of MultiActiveLeaseArbiter
+      //MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction);
+      // todo - uncomment after flow trigger handler provides such an api
+      //Properties jobProps = getJobProperties(dagAction);
+      //flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+      //if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+        // can it return null? is this iterator allowed to return null?
+        return createDagTask(dagAction, new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, 
System.currentTimeMillis()));
+      //}
+    } catch (Exception e) {
+      //TODO: need to handle exceptions gracefully
+      log.error("Error creating DagTask", e);
+    }
+    return null;
+  }
+
+  // todo - move it to dag action class, and move the entire dag action class 
to gobblin-service module
+  private Properties getJobProperties(DagActionStore.DagAction dagAction) {
+    String dagId = String.valueOf(
+        DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId()));
+    Optional<Dag<JobExecutionPlan>> dag = 
dagManagementStateStore.getDag(dagId);
+    if (dag.isPresent()) {
+      return 
dag.get().getStartNodes().get(0).getValue().getJobSpec().getConfigAsProperties();
+    } else {
+      throw new RuntimeException("DagAction " + dagAction + " does not exist 
in dag management state store");
+    }
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction,
+      MultiActiveLeaseArbiter.LeaseAttemptStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:
+      default:
+       throw new UnsupportedOperationException("Yet to provide 
implementation.");
+    }
+  }
+
+  protected void complete(DagTask dagTask) throws IOException {
+    //dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+  }
+
+  public void addDagAction(DagActionStore.DagAction dagAction) {

Review Comment:
   clarify name "addDagActionToQueue"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+
+import com.google.common.base.Optional;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Holds a stream of {@link DagTask}s that {@link DagProcessingEngine} would 
pull from for processing.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+// change to iterable
+public class DagTaskStream implements Iterator<DagTask>{
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue;
+  //private FlowTriggerHandler flowTriggerHandler;
+  private final DagManagementStateStore dagManagementStateStore;
+
+  @Override
+  public boolean hasNext() {
+    return !this.dagActionQueue.isEmpty();
+  }
+
+  @Override
+  public DagTask next() {
+    DagActionStore.DagAction dagAction = this.dagActionQueue.poll();
+    try {
+      // todo reconsider the use of MultiActiveLeaseArbiter
+      //MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction);
+      // todo - uncomment after flow trigger handler provides such an api
+      //Properties jobProps = getJobProperties(dagAction);
+      //flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+      //if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+        // can it return null? is this iterator allowed to return null?
+        return createDagTask(dagAction, new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, 
System.currentTimeMillis()));
+      //}
+    } catch (Exception e) {
+      //TODO: need to handle exceptions gracefully
+      log.error("Error creating DagTask", e);
+    }
+    return null;
+  }
+
+  // todo - move it to dag action class, and move the entire dag action class 
to gobblin-service module
+  private Properties getJobProperties(DagActionStore.DagAction dagAction) {
+    String dagId = String.valueOf(
+        DagManagerUtils.generateDagId(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId()));
+    Optional<Dag<JobExecutionPlan>> dag = 
dagManagementStateStore.getDag(dagId);
+    if (dag.isPresent()) {
+      return 
dag.get().getStartNodes().get(0).getValue().getJobSpec().getConfigAsProperties();
+    } else {
+      throw new RuntimeException("DagAction " + dagAction + " does not exist 
in dag management state store");
+    }
+  }
+
+  private DagTask createDagTask(DagActionStore.DagAction dagAction,
+      MultiActiveLeaseArbiter.LeaseAttemptStatus leaseObtainedStatus) {
+    DagActionStore.FlowActionType flowActionType = 
dagAction.getFlowActionType();
+    switch (flowActionType) {
+      case KILL:
+        return new KillDagTask(dagAction, leaseObtainedStatus);
+      case RESUME:
+      case LAUNCH:
+      case ADVANCE:

Review Comment:
   add TODOs to complete later



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+
+import com.google.common.base.Optional;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * Holds a stream of {@link DagTask}s that {@link DagProcessingEngine} would 
pull from for processing.
+ * It provides an implementation for {@link DagManagement} that defines the 
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's 
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+// change to iterable
+public class DagTaskStream implements Iterator<DagTask>{
+  private final BlockingQueue<DagActionStore.DagAction> dagActionQueue;
+  //private FlowTriggerHandler flowTriggerHandler;
+  private final DagManagementStateStore dagManagementStateStore;
+
+  @Override
+  public boolean hasNext() {
+    return !this.dagActionQueue.isEmpty();
+  }
+
+  @Override
+  public DagTask next() {
+    DagActionStore.DagAction dagAction = this.dagActionQueue.poll();
+    try {
+      // todo reconsider the use of MultiActiveLeaseArbiter
+      //MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction);
+      // todo - uncomment after flow trigger handler provides such an api
+      //Properties jobProps = getJobProperties(dagAction);
+      //flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, 
System.currentTimeMillis());
+      //if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+        // can it return null? is this iterator allowed to return null?
+        return createDagTask(dagAction, new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, 
System.currentTimeMillis()));

Review Comment:
   `while lease not obtained {
        // poll dag action queue 
        // attempt lease on it
   }`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to