[
https://issues.apache.org/jira/browse/GOBBLIN-2020?focusedWorklogId=911340&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-911340
]
ASF GitHub Bot logged work on GOBBLIN-2020:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 25/Mar/24 17:25
Start Date: 25/Mar/24 17:25
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3900:
URL: https://github.com/apache/gobblin/pull/3900#discussion_r1537942990
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java:
##########
@@ -77,9 +77,9 @@ public static String
qualifyNamePerExecWithoutFlowExecId(String name, Config wor
}
/** @return execution-specific name, incorporating any {@link
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
- public static String qualifyNamePerExecWithFlowExecId(String name, Config
workerConfig) {
- Optional<String> optFlowExecId =
Optional.ofNullable(ConfigUtils.getString(workerConfig,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
- return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId,
workerConfig);
+ public static String qualifyNamePerExecWithFlowExecId(String name, Config
jobProps) {
Review Comment:
javadoc still names `workerConfig`. also, we could be more agnostic and
just call it `config`, so it's up to the caller of what to pass in, since
really you haven't altered impl semantics here, only naming.
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -60,13 +62,17 @@ public class CommitActivityImpl implements CommitActivity {
static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+
@Override
public int commit(WUProcessingSpec workSpec) {
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ String jobName = UNDEFINED_JOB_NAME;
Review Comment:
tip: clearer to encode "undefined semantics" within the type system with
`Optional`, rather than merely when initializing the variable.
then for reading, use:
```
optJobName.orElse("<<job_name_stub>>")
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
public int execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
EventTimer timer = timerFactory.createJobTimer();
-
- int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
- if (numWUsGenerated > 0) {
- ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow();
-
- JobState jobState = new JobState(jobProps);
- URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
- Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
- WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
- // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
- if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
&&
-
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
- int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
- int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
- wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
- }
-
- int numWUsProcessed = processWUsWorkflow.process(wuSpec);
- if (numWUsProcessed != numWUsGenerated) {
- log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
- // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
+ int numWUsGenerated = 0;
+ try {
+ numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
+ if (numWUsGenerated > 0) {
+ JobState jobState = new JobState(jobProps);
+ URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+ Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+ ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow(jobProps);
+ WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
+ // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
+ if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+ &&
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
+ int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+ int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+ wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
+ }
+
+ int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+ if (numWUsProcessed != numWUsGenerated) {
+ log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
+ // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
+ }
}
+ timer.stop();
+ } catch (Exception e) {
+ // Emit a failed GobblinTrackingEvent to record job failures
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).stop();
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ String.format("Failed Gobblin job %s",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
+ e.getClass().toString(),
Review Comment:
I can't recall whether or not this differs from `.getName()`... but
semantically, I believe the latter is what you're going for
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
public int execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
EventTimer timer = timerFactory.createJobTimer();
-
- int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
- if (numWUsGenerated > 0) {
- ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow();
-
- JobState jobState = new JobState(jobProps);
- URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
- Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
- WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
- // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
- if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
&&
-
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
- int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
- int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
- wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
- }
-
- int numWUsProcessed = processWUsWorkflow.process(wuSpec);
- if (numWUsProcessed != numWUsGenerated) {
- log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
- // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
+ int numWUsGenerated = 0;
+ try {
+ numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
+ if (numWUsGenerated > 0) {
+ JobState jobState = new JobState(jobProps);
+ URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+ Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+ ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow(jobProps);
+ WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
+ // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
+ if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+ &&
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
+ int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+ int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+ wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
+ }
+
+ int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+ if (numWUsProcessed != numWUsGenerated) {
+ log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
+ // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
Review Comment:
shall we do so? (I know you throw `IOException`, but couldn't count
mismatch still be possible?)
if not, let's update the TODO or even replace w/ a "finalized" comment
stating the strategy
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java:
##########
@@ -75,7 +75,7 @@ public int generateWorkUnits(Properties jobProps,
EventSubmitterContext eventSub
JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs);
- return workUnits.size();
+ return jobState.getTaskCount();
Review Comment:
since I got it wrong the first time, this likely deserves a comment.
also, I may not be logging the count anywhere, but it could be a meaningful
thing to log both the MWUs and individual WUs count. (in fact that might do
double duty-duty in lieu of that comment I just suggested)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -135,9 +141,21 @@ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry
IteratorExecutor.logFailures(result, null, 10);
+ Set<String> failedDatasetUrns = new HashSet<>();
+ for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
Review Comment:
nit, prefer:
```
Set<String> failedDatasetUrns = datasetStatesByUrns.values().stream()
.filter(...)
.collect(...)
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -135,9 +141,21 @@ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry
IteratorExecutor.logFailures(result, null, 10);
+ Set<String> failedDatasetUrns = new HashSet<>();
+ for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+ // Set the overall job state to FAILED if the job failed to process
any dataset
Review Comment:
I don't see that. where is it set?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
public int execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
EventTimer timer = timerFactory.createJobTimer();
-
- int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
- if (numWUsGenerated > 0) {
- ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow();
-
- JobState jobState = new JobState(jobProps);
- URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
- Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
- WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
- // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
- if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
&&
-
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
- int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
- int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
- wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
- }
-
- int numWUsProcessed = processWUsWorkflow.process(wuSpec);
- if (numWUsProcessed != numWUsGenerated) {
- log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
- // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
+ int numWUsGenerated = 0;
+ try {
+ numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
+ if (numWUsGenerated > 0) {
+ JobState jobState = new JobState(jobProps);
+ URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+ Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+ ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow(jobProps);
+ WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
+ // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
+ if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+ &&
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
+ int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+ int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+ wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
+ }
Review Comment:
alas, I authored this, but now see a "helper" method would be clearer to
read; e.g.:
```
protected static WUProcessingSpec createProcessingSpec(Properties jobProps,
EventSubmitterContext eventSubmitterContext);
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
public int execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
EventTimer timer = timerFactory.createJobTimer();
-
- int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
- if (numWUsGenerated > 0) {
- ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow();
-
- JobState jobState = new JobState(jobProps);
- URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
- Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
- WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
- // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
- if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
&&
-
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
- int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
- int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
- wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
- }
-
- int numWUsProcessed = processWUsWorkflow.process(wuSpec);
- if (numWUsProcessed != numWUsGenerated) {
- log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
- // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
+ int numWUsGenerated = 0;
Review Comment:
isn't no WUs (aka. no diff between source and dest) a valid outcome?
Issue Time Tracking
-------------------
Worklog Id: (was: 911340)
Time Spent: 1h 50m (was: 1h 40m)
> Fixes failed workflow paths in Temporal to properly emit GTE and fail job
> when commit fails
> -------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-2020
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2020
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: William Lo
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> There are a few bugs in Gobblin-Temporal execution mode:
> 1. If the publishing step fails, the activity does not report a failure due
> to missing a step post commit to check the dataset states
> 2. No GTEs are emitted upon job failure, which makes tracking difficult
> 3. Some metadata propagation for flow execution ID with workflows is
> incorrect due to a bug reading worker configs instead of job props
> 4. The GenerateWus activity does not return the right number of workunits
> created due to counting top level multiworkunits
--
This message was sent by Atlassian Jira
(v8.20.10#820010)