[
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=898850&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-898850
]
ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 10/Jan/24 03:54
Start Date: 10/Jan/24 03:54
Worklog Time Spent: 10m
Work Description: arjun4084346 commented on code in PR #3853:
URL: https://github.com/apache/gobblin/pull/3853#discussion_r1446855904
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -366,4 +389,80 @@ static List<String> getDistinctUniqueRequesters(String
serializedRequesters) {
throw new RuntimeException("Could not process requesters due to ", e);
}
}
+
+ public static void submitAndSet(Dag<JobExecutionPlan> dag,
Optional<EventSubmitter> eventSubmitter) {
+ eventSubmitter.toJavaUtil().ifPresent(es -> {
+ for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(PENDING);
+ }
+ });
+ }
+
+ /**
+ * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+ */
+ public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+ Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
+ String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName,
jobStatusRetriever, jobStatusPolledTimer);
+ }
+
+ /**
+ * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+ */
+ public static JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
+ if (dag == null || dag.isEmpty()) {
+ return null;
+ }
+ Config jobConfig =
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId,
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY, jobStatusRetriever,
jobStatusPolledTimer);
+ }
+
+ /**
+ * Retrieve the flow's {@link JobStatus} and update the timer if
jobStatusPolledTimer is present.
+ */
+ public static JobStatus pollStatus(String flowGroup, String flowName, long
flowExecutionId, String jobGroup, String jobName,
+ JobStatusRetriever jobStatusRetriever, Optional<Timer>
jobStatusPolledTimer) {
+ long pollStartTime = System.nanoTime();
+ Iterator<JobStatus> jobStatusIterator =
+ jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup,
flowExecutionId, jobName, jobGroup);
+ Instrumented.updateTimer(jobStatusPolledTimer, System.nanoTime() -
pollStartTime, TimeUnit.NANOSECONDS);
+
+ if (jobStatusIterator.hasNext()) {
+ return jobStatusIterator.next();
+ } else {
+ return null;
Review Comment:
@umustafi @phet I am making the change, though it is like this on the left
side also, and is unrelated to the Dag refactoring
Issue Time Tracking
-------------------
Worklog Id: (was: 898850)
Time Spent: 17h (was: 16h 50m)
> Refactor code to move current in-memory references to new design for REST
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1910
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 17h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)