[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=898826&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-898826
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Jan/24 01:11
            Start Date: 10/Jan/24 01:11
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3853:
URL: https://github.com/apache/gobblin/pull/3853#discussion_r1446774978


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java:
##########


Review Comment:
   can we avoid some of these changes for now if no code is changed?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -17,23 +17,26 @@
 
 package org.apache.gobblin.service.monitoring;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+

Review Comment:
   same comment as above for cleaning up this pr



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -17,12 +17,6 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 

Review Comment:
   same here let's remove some of these changes to keep PR shorter



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +389,80 @@ static List<String> getDistinctUniqueRequesters(String 
serializedRequesters) {
       throw new RuntimeException("Could not process requesters due to ", e);
     }
   }
+
+  public static void submitAndSet(Dag<JobExecutionPlan> dag, 
Optional<EventSubmitter> eventSubmitter) {
+    eventSubmitter.toJavaUtil().ifPresent(es -> {
+      for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+        
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+        jobExecutionPlan.setExecutionStatus(PENDING);
+      }
+    });
+  }
+
+  /**
+   * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+   */
+  public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode, 
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+    String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName, 
jobStatusRetriever, jobStatusPolledTimer);
+  }
+
+  /**
+   * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link 
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+   */
+  public static  JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag, 
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+    if (dag == null || dag.isEmpty()) {
+      return null;
+    }
+    Config jobConfig = 
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+    String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+    return pollStatus(flowGroup, flowName, flowExecutionId, 
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever, 
jobStatusPolledTimer);
+  }
+
+  /**
+   * Retrieve the flow's {@link JobStatus} and update the timer if 
jobStatusPolledTimer is present.
+   */
+  public static JobStatus pollStatus(String flowGroup, String flowName, long 
flowExecutionId, String jobGroup, String jobName,
+    JobStatusRetriever jobStatusRetriever, Optional<Timer> 
jobStatusPolledTimer) {
+    long pollStartTime = System.nanoTime();
+    Iterator<JobStatus> jobStatusIterator =
+        jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, 
flowExecutionId, jobName, jobGroup);
+    Instrumented.updateTimer(jobStatusPolledTimer, System.nanoTime() - 
pollStartTime, TimeUnit.NANOSECONDS);
+
+    if (jobStatusIterator.hasNext()) {
+      return jobStatusIterator.next();
+    } else {
+      return null;

Review Comment:
   can we return an Optional here instead of null to make it obvious there may 
be no status returned?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -48,25 +54,42 @@
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 
+import static org.apache.gobblin.service.ExecutionStatus.PENDING;
+
 
 public class DagManagerUtils {
   static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
   static String QUOTA_KEY_SEPERATOR = ",";
 
-  static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
+  public static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
     return getFlowId(dag.getStartNodes().get(0));
   }
 
+  public static DagActionStore.DagAction 
createDagActionFromDag(Dag<JobExecutionPlan> dag, DagActionStore.FlowActionType 
flowActionType) {
+    return createDagActionFromDagNode(dag.getStartNodes().get(0), 
flowActionType);
+  }
+
+  // todo - dag action object does not have any identifier to tell if it is 
for a complete dag or just for one dag node

Review Comment:
   make this `TODO:` so recognized by IntelliJ





Issue Time Tracking
-------------------

    Worklog Id:     (was: 898826)
    Time Spent: 16h  (was: 15h 50m)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 16h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to