Will-Lo commented on code in PR #3900:
URL: https://github.com/apache/gobblin/pull/3900#discussion_r1534657694
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
public int execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
EventTimer timer = timerFactory.createJobTimer();
-
- int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
- if (numWUsGenerated > 0) {
- ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow();
-
- JobState jobState = new JobState(jobProps);
- URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
- Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
- WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
- // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
- if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
&&
-
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
- int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
- int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
- wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
- }
-
- int numWUsProcessed = processWUsWorkflow.process(wuSpec);
- if (numWUsProcessed != numWUsGenerated) {
- log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
- // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
+ int numWUsGenerated = 0;
+ try {
+ numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
+ if (numWUsGenerated > 0) {
+ JobState jobState = new JobState(jobProps);
+ URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+ Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+ ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow(jobProps);
+ WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
+ // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
+ if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+ &&
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
+ int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+ int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+ wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
+ }
+
+ int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+ if (numWUsProcessed != numWUsGenerated) {
+ log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
+ // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
+ }
}
+ timer.stop();
+ } catch (Exception e) {
+ // Emit a failed GobblinTrackingEvent to record job failures
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).stop();
Review Comment:
GaaS needs to get a failed event which the `AbstractJobLauncher` emits
otherwise the job will be stalled indefinitely until it is SLA killed, or the
failure will be hidden as a success. How I see it is the
`ExecuteGobblinWorkflowImpl` is the Temporal version of the
`AbstractJobLauncher` which should capture failures within each sub
workflow/activity. So from a service provider standpoint if the job fails we
want to easily see which workflow failed by loudly throwing an exception there,
and then catch the exception in the parent workflow and emit the GTE to capture
any failures in either generating workunits, processing workunits, or
committing workunits. To emulate MR behavior and not fail due to failed
activities in process WUs we want to modify the code in the ProcessWU Workflow
to be resilient and have the right retries set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]