[
https://issues.apache.org/jira/browse/GOBBLIN-2017?focusedWorklogId=922634&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-922634
]
ASF GitHub Bot logged work on GOBBLIN-2017:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Jun/24 19:00
Start Date: 07/Jun/24 19:00
Worklog Time Spent: 10m
Work Description: arjun4084346 commented on code in PR #3965:
URL: https://github.com/apache/gobblin/pull/3965#discussion_r1631589630
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -89,11 +86,29 @@ protected void act(DagManagementStateStore
dagManagementStateStore, Pair<Optiona
return;
}
+ if (dagNodeWithJobStatus.getRight().isPresent()
+ &&
!FlowStatusGenerator.FINISHED_STATUSES.contains(dagNodeWithJobStatus.getRight().get().getEventName()))
{
+ // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
+ throw new RuntimeException(String.format("Job status for dagNode %s is
%s. Re-evaluate dag action are created for"
+ + " new jobs with no job status when there are multiple of them
to run next; or when a job finishes with status - %s",
+ dagNodeId, dagNodeWithJobStatus.getRight().get().getEventName(),
FlowStatusGenerator.FINISHED_STATUSES));
+ }
+
Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeWithJobStatus.getLeft().get();
+
+ if (!dagNodeWithJobStatus.getRight().isPresent()) {
+ // if the job status is not present, this job was never launched, submit
it now
+ submitJobForThisDagNode(dagManagementStateStore, dagNode);
+ return;
+ }
Review Comment:
It comes down to how we define the responsibilities of launch and reevaluate
dagprocs
I see LaunchDagProc's main responsibility to get do the preparation of
starting a dag, which includes **creating** a dag out of a flowConfig, and
bring the flow into a launchable state.
I imagine it having **only one** LaunchDagProc in the whole process. If we
create multiple LaunchDagProcs we may have to again add clumsy code to handle
cases and possible do duplicate "preparation" work.
ReevaluateDagProc comes handy here because it is not a
"LaunchNextJobsDagProc", it may assume additional responsibility of doing a
broader reevaluation and may decide to launch the current job also when needed.
Issue Time Tracking
-------------------
Worklog Id: (was: 922634)
Time Spent: 2.5h (was: 2h 20m)
> divide multiple job launches in a LaunchDagProc into multiple LaunchDagActions
> ------------------------------------------------------------------------------
>
> Key: GOBBLIN-2017
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2017
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> divide multiple job launches in a LaunchDagProc into multiple
> LaunchDagActions for two reasons
> 1) it will then spend less time in each dag proc processing and have more
> chance to complete the operation in the lease time
> 2) handling partial job submissions in one LaunchDagProc sending N jobs is
> difficult
--
This message was sent by Atlassian Jira
(v8.20.10#820010)