[
https://issues.apache.org/jira/browse/GOBBLIN-2020?focusedWorklogId=910964&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-910964
]
ASF GitHub Bot logged work on GOBBLIN-2020:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 21/Mar/24 20:45
Start Date: 21/Mar/24 20:45
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3900:
URL: https://github.com/apache/gobblin/pull/3900#discussion_r1534657694
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
public int execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
EventTimer timer = timerFactory.createJobTimer();
-
- int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
- if (numWUsGenerated > 0) {
- ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow();
-
- JobState jobState = new JobState(jobProps);
- URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
- Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
- WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
- // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
- if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
&&
-
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
- int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
- int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
- wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
- }
-
- int numWUsProcessed = processWUsWorkflow.process(wuSpec);
- if (numWUsProcessed != numWUsGenerated) {
- log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
- // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
+ int numWUsGenerated = 0;
+ try {
+ numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
+ if (numWUsGenerated > 0) {
+ JobState jobState = new JobState(jobProps);
+ URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+ Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+ ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow(jobProps);
+ WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
+ // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
+ if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+ &&
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
+ int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+ int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+ wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
+ }
+
+ int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+ if (numWUsProcessed != numWUsGenerated) {
+ log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
+ // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
+ }
}
+ timer.stop();
+ } catch (Exception e) {
+ // Emit a failed GobblinTrackingEvent to record job failures
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).stop();
Review Comment:
GaaS needs to get a failed event which the `AbstractJobLauncher` emits
otherwise the job will be stalled indefinitely until it is SLA killed, or the
failure will be hidden as a success. How I see it is the
`ExecuteGobblinWorkflowImpl` is the Temporal version of the
`AbstractJobLauncher` which should capture failures within each sub
workflow/activity. So from a service provider standpoint if the job fails we
want to easily see which workflow failed by loudly throwing an exception there,
and then catch the exception in the parent workflow and emit the GTE to capture
any failures in either generating workunits, processing workunits, or
committing workunits. To emulate MR behavior and not fail due to failed
activities in process WUs we want to modify the code in the ProcessWU Workflow
to be resilient and have the right retries set.
Issue Time Tracking
-------------------
Worklog Id: (was: 910964)
Time Spent: 1h 10m (was: 1h)
> Fixes failed workflow paths in Temporal to properly emit GTE and fail job
> when commit fails
> -------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-2020
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2020
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: William Lo
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> There are a few bugs in Gobblin-Temporal execution mode:
> 1. If the publishing step fails, the activity does not report a failure due
> to missing a step post commit to check the dataset states
> 2. No GTEs are emitted upon job failure, which makes tracking difficult
> 3. Some metadata propagation for flow execution ID with workflows is
> incorrect due to a bug reading worker configs instead of job props
> 4. The GenerateWus activity does not return the right number of workunits
> created due to counting top level multiworkunits
--
This message was sent by Atlassian Jira
(v8.20.10#820010)