[
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=898826&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-898826
]
ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 10/Jan/24 01:11
Start Date: 10/Jan/24 01:11
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3853:
URL: https://github.com/apache/gobblin/pull/3853#discussion_r1446774978
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java:
##########
Review Comment:
can we avoid some of these changes for now if no code is changed?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -17,23 +17,26 @@
package org.apache.gobblin.service.monitoring;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+
Review Comment:
same comment as above for cleaning up this pr
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -17,12 +17,6 @@
package org.apache.gobblin.service.modules.orchestration;
Review Comment:
same here let's remove some of these changes to keep PR shorter
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +389,80 @@ static List<String> getDistinctUniqueRequesters(String
serializedRequesters) {
throw new RuntimeException("Could not process requesters due to ", e);
}
}
+
+ public static void submitAndSet(Dag<JobExecutionPlan> dag,
Optional<EventSubmitter> eventSubmitter) {
+ eventSubmitter.toJavaUtil().ifPresent(es -> {
+ for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(PENDING);
+ }
+ });
+ }
+
+ /**
+ * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+ */
+ public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+ String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName,
jobStatusRetriever, jobStatusPolledTimer);
+ }
+
+ /**
+ * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+ */
+ public static JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+ if (dag == null || dag.isEmpty()) {
+ return null;
+ }
+ Config jobConfig =
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId,
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever,
jobStatusPolledTimer);
+ }
+
+ /**
+ * Retrieve the flow's {@link JobStatus} and update the timer if
jobStatusPolledTimer is present.
+ */
+ public static JobStatus pollStatus(String flowGroup, String flowName, long
flowExecutionId, String jobGroup, String jobName,
+ JobStatusRetriever jobStatusRetriever, Optional<Timer>
jobStatusPolledTimer) {
+ long pollStartTime = System.nanoTime();
+ Iterator<JobStatus> jobStatusIterator =
+ jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup,
flowExecutionId, jobName, jobGroup);
+ Instrumented.updateTimer(jobStatusPolledTimer, System.nanoTime() -
pollStartTime, TimeUnit.NANOSECONDS);
+
+ if (jobStatusIterator.hasNext()) {
+ return jobStatusIterator.next();
+ } else {
+ return null;
Review Comment:
can we return an Optional here instead of null to make it obvious there may
be no status returned?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -48,25 +54,42 @@
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
+import static org.apache.gobblin.service.ExecutionStatus.PENDING;
+
public class DagManagerUtils {
static long DEFAULT_FLOW_SLA_MILLIS = TimeUnit.HOURS.toMillis(24);
static String QUOTA_KEY_SEPERATOR = ",";
- static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
+ public static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
return getFlowId(dag.getStartNodes().get(0));
}
+ public static DagActionStore.DagAction
createDagActionFromDag(Dag<JobExecutionPlan> dag, DagActionStore.FlowActionType
flowActionType) {
+ return createDagActionFromDagNode(dag.getStartNodes().get(0),
flowActionType);
+ }
+
+ // todo - dag action object does not have any identifier to tell if it is
for a complete dag or just for one dag node
Review Comment:
make this `TODO:` so recognized by IntelliJ
Issue Time Tracking
-------------------
Worklog Id: (was: 898826)
Time Spent: 16h (was: 15h 50m)
> Refactor code to move current in-memory references to new design for REST
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1910
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 16h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)