[ 
https://issues.apache.org/jira/browse/GOBBLIN-2020?focusedWorklogId=911340&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-911340
 ]

ASF GitHub Bot logged work on GOBBLIN-2020:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Mar/24 17:25
            Start Date: 25/Mar/24 17:25
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3900:
URL: https://github.com/apache/gobblin/pull/3900#discussion_r1537942990


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java:
##########
@@ -77,9 +77,9 @@ public static String 
qualifyNamePerExecWithoutFlowExecId(String name, Config wor
   }
 
   /** @return execution-specific name, incorporating any {@link 
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
-  public static String qualifyNamePerExecWithFlowExecId(String name, Config 
workerConfig) {
-    Optional<String> optFlowExecId = 
Optional.ofNullable(ConfigUtils.getString(workerConfig, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
-    return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, 
workerConfig);
+  public static String qualifyNamePerExecWithFlowExecId(String name, Config 
jobProps) {

Review Comment:
   javadoc still names `workerConfig`.  also, we could be more agnostic and 
just call it `config`, so it's up to the caller of what to pass in, since 
really you haven't altered impl semantics here, only naming.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -60,13 +62,17 @@ public class CommitActivityImpl implements CommitActivity {
 
   static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
   static int DEFAULT_NUM_COMMIT_THREADS = 1;
+  static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+
   @Override
   public int commit(WUProcessingSpec workSpec) {
     // TODO: Make this configurable
     int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+    String jobName = UNDEFINED_JOB_NAME;

Review Comment:
   tip: clearer to encode "undefined semantics" within the type system with 
`Optional`, rather than merely when initializing the variable.
   
   then for reading, use:
   ```
   optJobName.orElse("<<job_name_stub>>")
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   public int execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     EventTimer timer = timerFactory.createJobTimer();
-
-    int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
-    if (numWUsGenerated > 0) {
-      ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow();
-
-      JobState jobState = new JobState(jobProps);
-      URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
-      Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
-      WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
-      // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
-      if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
-          
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
-        int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
-        int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
-        wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
-      }
-
-      int numWUsProcessed = processWUsWorkflow.process(wuSpec);
-      if (numWUsProcessed != numWUsGenerated) {
-        log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
-        // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+    int numWUsGenerated = 0;
+    try {
+      numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
+      if (numWUsGenerated > 0) {
+        JobState jobState = new JobState(jobProps);
+        URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+        Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+        ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
+        WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
+        // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
+        if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+            && 
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
+          int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+          int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+          wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
+        }
+
+        int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+        if (numWUsProcessed != numWUsGenerated) {
+          log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
+          // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+        }
       }
+      timer.stop();
+    } catch (Exception e) {
+      // Emit a failed GobblinTrackingEvent to record job failures
+      timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).stop();
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          String.format("Failed Gobblin job %s", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
+          e.getClass().toString(),

Review Comment:
   I can't recall whether or not this differs from `.getName()`... but 
semantically, I believe the latter is what you're going for



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   public int execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     EventTimer timer = timerFactory.createJobTimer();
-
-    int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
-    if (numWUsGenerated > 0) {
-      ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow();
-
-      JobState jobState = new JobState(jobProps);
-      URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
-      Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
-      WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
-      // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
-      if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
-          
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
-        int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
-        int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
-        wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
-      }
-
-      int numWUsProcessed = processWUsWorkflow.process(wuSpec);
-      if (numWUsProcessed != numWUsGenerated) {
-        log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
-        // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+    int numWUsGenerated = 0;
+    try {
+      numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
+      if (numWUsGenerated > 0) {
+        JobState jobState = new JobState(jobProps);
+        URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+        Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+        ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
+        WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
+        // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
+        if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+            && 
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
+          int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+          int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+          wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
+        }
+
+        int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+        if (numWUsProcessed != numWUsGenerated) {
+          log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
+          // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)

Review Comment:
   shall we do so?  (I know you throw `IOException`, but couldn't count 
mismatch still be possible?)
   
   if not, let's update the TODO or even replace w/ a "finalized" comment 
stating the strategy



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java:
##########
@@ -75,7 +75,7 @@ public int generateWorkUnits(Properties jobProps, 
EventSubmitterContext eventSub
       JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
       JobStateUtils.writeJobState(jobState, workDirRoot, fs);
 
-      return workUnits.size();
+      return jobState.getTaskCount();

Review Comment:
   since I got it wrong the first time, this likely deserves a comment.
   
   also, I may not be logging the count anywhere, but it could be a meaningful 
thing to log both the MWUs and individual WUs count.  (in fact that might do 
double duty-duty in lieu of that comment I just suggested)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -135,9 +141,21 @@ public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry
 
       IteratorExecutor.logFailures(result, null, 10);
 
+      Set<String> failedDatasetUrns = new HashSet<>();
+      for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {

Review Comment:
   nit, prefer:
   ```
   Set<String> failedDatasetUrns = datasetStatesByUrns.values().stream()
       .filter(...)
       .collect(...)
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -135,9 +141,21 @@ public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry
 
       IteratorExecutor.logFailures(result, null, 10);
 
+      Set<String> failedDatasetUrns = new HashSet<>();
+      for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+        // Set the overall job state to FAILED if the job failed to process 
any dataset

Review Comment:
   I don't see that.  where is it set?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   public int execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     EventTimer timer = timerFactory.createJobTimer();
-
-    int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
-    if (numWUsGenerated > 0) {
-      ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow();
-
-      JobState jobState = new JobState(jobProps);
-      URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
-      Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
-      WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
-      // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
-      if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
-          
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
-        int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
-        int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
-        wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
-      }
-
-      int numWUsProcessed = processWUsWorkflow.process(wuSpec);
-      if (numWUsProcessed != numWUsGenerated) {
-        log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
-        // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+    int numWUsGenerated = 0;
+    try {
+      numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
+      if (numWUsGenerated > 0) {
+        JobState jobState = new JobState(jobProps);
+        URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+        Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+        ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
+        WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
+        // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
+        if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+            && 
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
+          int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+          int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+          wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
+        }

Review Comment:
   alas, I authored this, but now see a "helper" method would be clearer to 
read; e.g.:
   ```
   protected static WUProcessingSpec createProcessingSpec(Properties jobProps, 
EventSubmitterContext eventSubmitterContext);
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   public int execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     EventTimer timer = timerFactory.createJobTimer();
-
-    int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
-    if (numWUsGenerated > 0) {
-      ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow();
-
-      JobState jobState = new JobState(jobProps);
-      URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
-      Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
-      WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
-      // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
-      if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
-          
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
-        int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
-        int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
-        wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
-      }
-
-      int numWUsProcessed = processWUsWorkflow.process(wuSpec);
-      if (numWUsProcessed != numWUsGenerated) {
-        log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
-        // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+    int numWUsGenerated = 0;

Review Comment:
   isn't no WUs (aka. no diff between source and dest) a valid outcome?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 911340)
    Time Spent: 1h 50m  (was: 1h 40m)

> Fixes failed workflow paths in Temporal to properly emit GTE and fail job 
> when commit fails
> -------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2020
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2020
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: William Lo
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> There are a few bugs in Gobblin-Temporal execution mode:
> 1. If the publishing step fails, the activity does not report a failure due 
> to missing a step post commit to check the dataset states
> 2. No GTEs are emitted upon job failure, which makes tracking difficult
> 3. Some metadata propagation for flow execution ID with workflows is 
> incorrect due to a bug reading worker configs instead of job props
> 4. The GenerateWus activity does not return the right number of workunits 
> created due to counting top level multiworkunits



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to