phet commented on code in PR #3900:
URL: https://github.com/apache/gobblin/pull/3900#discussion_r1537942990


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java:
##########
@@ -77,9 +77,9 @@ public static String 
qualifyNamePerExecWithoutFlowExecId(String name, Config wor
   }
 
   /** @return execution-specific name, incorporating any {@link 
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
-  public static String qualifyNamePerExecWithFlowExecId(String name, Config 
workerConfig) {
-    Optional<String> optFlowExecId = 
Optional.ofNullable(ConfigUtils.getString(workerConfig, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
-    return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, 
workerConfig);
+  public static String qualifyNamePerExecWithFlowExecId(String name, Config 
jobProps) {

Review Comment:
   javadoc still names `workerConfig`.  also, we could be more agnostic and 
just call it `config`, so it's up to the caller of what to pass in, since 
really you haven't altered impl semantics here, only naming.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -60,13 +62,17 @@ public class CommitActivityImpl implements CommitActivity {
 
   static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
   static int DEFAULT_NUM_COMMIT_THREADS = 1;
+  static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+
   @Override
   public int commit(WUProcessingSpec workSpec) {
     // TODO: Make this configurable
     int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+    String jobName = UNDEFINED_JOB_NAME;

Review Comment:
   tip: clearer to encode "undefined semantics" within the type system with 
`Optional`, rather than merely when initializing the variable.
   
   then for reading, use:
   ```
   optJobName.orElse("<<job_name_stub>>")
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   public int execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     EventTimer timer = timerFactory.createJobTimer();
-
-    int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
-    if (numWUsGenerated > 0) {
-      ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow();
-
-      JobState jobState = new JobState(jobProps);
-      URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
-      Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
-      WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
-      // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
-      if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
-          
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
-        int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
-        int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
-        wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
-      }
-
-      int numWUsProcessed = processWUsWorkflow.process(wuSpec);
-      if (numWUsProcessed != numWUsGenerated) {
-        log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
-        // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+    int numWUsGenerated = 0;
+    try {
+      numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
+      if (numWUsGenerated > 0) {
+        JobState jobState = new JobState(jobProps);
+        URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+        Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+        ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
+        WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
+        // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
+        if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+            && 
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
+          int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+          int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+          wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
+        }
+
+        int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+        if (numWUsProcessed != numWUsGenerated) {
+          log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
+          // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+        }
       }
+      timer.stop();
+    } catch (Exception e) {
+      // Emit a failed GobblinTrackingEvent to record job failures
+      timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).stop();
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          String.format("Failed Gobblin job %s", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
+          e.getClass().toString(),

Review Comment:
   I can't recall whether or not this differs from `.getName()`... but 
semantically, I believe the latter is what you're going for



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   public int execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     EventTimer timer = timerFactory.createJobTimer();
-
-    int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
-    if (numWUsGenerated > 0) {
-      ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow();
-
-      JobState jobState = new JobState(jobProps);
-      URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
-      Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
-      WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
-      // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
-      if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
-          
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
-        int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
-        int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
-        wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
-      }
-
-      int numWUsProcessed = processWUsWorkflow.process(wuSpec);
-      if (numWUsProcessed != numWUsGenerated) {
-        log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
-        // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+    int numWUsGenerated = 0;
+    try {
+      numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
+      if (numWUsGenerated > 0) {
+        JobState jobState = new JobState(jobProps);
+        URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+        Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+        ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
+        WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
+        // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
+        if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+            && 
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
+          int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+          int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+          wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
+        }
+
+        int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+        if (numWUsProcessed != numWUsGenerated) {
+          log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
+          // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)

Review Comment:
   shall we do so?  (I know you throw `IOException`, but couldn't count 
mismatch still be possible?)
   
   if not, let's update the TODO or even replace w/ a "finalized" comment 
stating the strategy



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java:
##########
@@ -75,7 +75,7 @@ public int generateWorkUnits(Properties jobProps, 
EventSubmitterContext eventSub
       JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
       JobStateUtils.writeJobState(jobState, workDirRoot, fs);
 
-      return workUnits.size();
+      return jobState.getTaskCount();

Review Comment:
   since I got it wrong the first time, this likely deserves a comment.
   
   also, I may not be logging the count anywhere, but it could be a meaningful 
thing to log both the MWUs and individual WUs count.  (in fact that might do 
double duty-duty in lieu of that comment I just suggested)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -135,9 +141,21 @@ public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry
 
       IteratorExecutor.logFailures(result, null, 10);
 
+      Set<String> failedDatasetUrns = new HashSet<>();
+      for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {

Review Comment:
   nit, prefer:
   ```
   Set<String> failedDatasetUrns = datasetStatesByUrns.values().stream()
       .filter(...)
       .collect(...)
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -135,9 +141,21 @@ public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry
 
       IteratorExecutor.logFailures(result, null, 10);
 
+      Set<String> failedDatasetUrns = new HashSet<>();
+      for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+        // Set the overall job state to FAILED if the job failed to process 
any dataset

Review Comment:
   I don't see that.  where is it set?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   public int execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     EventTimer timer = timerFactory.createJobTimer();
-
-    int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
-    if (numWUsGenerated > 0) {
-      ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow();
-
-      JobState jobState = new JobState(jobProps);
-      URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
-      Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
-      WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
-      // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
-      if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
-          
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
-        int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
-        int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
-        wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
-      }
-
-      int numWUsProcessed = processWUsWorkflow.process(wuSpec);
-      if (numWUsProcessed != numWUsGenerated) {
-        log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
-        // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+    int numWUsGenerated = 0;
+    try {
+      numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
+      if (numWUsGenerated > 0) {
+        JobState jobState = new JobState(jobProps);
+        URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+        Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+        ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
+        WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
+        // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
+        if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+            && 
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
+          int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+          int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+          wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
+        }

Review Comment:
   alas, I authored this, but now see a "helper" method would be clearer to 
read; e.g.:
   ```
   protected static WUProcessingSpec createProcessingSpec(Properties jobProps, 
EventSubmitterContext eventSubmitterContext);
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -73,38 +75,47 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   public int execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     EventTimer timer = timerFactory.createJobTimer();
-
-    int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
-    if (numWUsGenerated > 0) {
-      ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow();
-
-      JobState jobState = new JobState(jobProps);
-      URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
-      Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
-      WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
-      // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
-      if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
 &&
-          
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
-        int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
-        int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
-        wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
-      }
-
-      int numWUsProcessed = processWUsWorkflow.process(wuSpec);
-      if (numWUsProcessed != numWUsGenerated) {
-        log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
-        // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
+    int numWUsGenerated = 0;

Review Comment:
   isn't no WUs (aka. no diff between source and dest) a valid outcome?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to