phet commented on code in PR #3934:
URL: https://github.com/apache/gobblin/pull/3934#discussion_r1578852308


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -166,44 +163,39 @@ public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry
       }
       if (!IteratorExecutor.verifyAllSuccessful(result)) {
         // TODO: propagate cause of failure and determine whether or not this 
is retryable to throw a non-retryable failure exception
-        String jobName = 
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
+        String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
         throw new IOException("Failed to commit dataset state for some 
dataset(s) of job " + jobName);
       }
     } catch (InterruptedException exc) {
       throw new IOException(exc);
     }
   }
 
+  /** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>} 
indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
+  private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem 
fs, JobState jobState, int numThreads) throws IOException {
+    // TODO - decide whether to replace this method by adapting 
TaskStateCollectorService::collectOutputTaskStates (whence much of this code 
was drawn)
+    StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+    // NOTE: TaskState dir is assumed to be a sibling to the workunits dir 
(following conventions of `MRJobLauncher`)
+    String jobIdPathName = new 
Path(workSpec.getWorkUnitsDir()).getParent().getName();
+    log.info("TaskStateStore path (name component): '{}' (fs: '{}')", 
jobIdPathName, fs.getUri());
+    Optional<Queue<TaskState>> taskStateQueueOpt = 
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobIdPathName, numThreads);
+    return taskStateQueueOpt.map(taskStateQueue ->
+        taskStateQueue.stream().peek(taskState ->

Review Comment:
   I'd stay away from that since there is more to 
`TaskStateCollectorService::collectOutputTaskStates`, namely creating a 
circular ref between the `JobState` and `TaskState`.  that's only something I'd 
do if absolutely necessary, and clearly there's not a need for 
gobblin-on-temporal's `CommitActivityImpl`.
   
   as this is long-standing gobblin-on-MR behavior, it's potentially a BIG risk 
to rework, which I don't actually foresee enough payoff to justify.
   
   rather than change the `TaskStateCollectorService` impl, this is the TODO I 
did also just add:
   ```
   // TODO - decide whether to replace this method by adapting 
TaskStateCollectorService::collectOutputTaskStates (whence much of this code 
was drawn)
   ```
   
   pragmatically weighing between the current one-line bug fix and doing a full 
rework/replacement, I went w/ the former out of expedience



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to