phet commented on code in PR #3934:
URL: https://github.com/apache/gobblin/pull/3934#discussion_r1578604945
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -166,44 +163,39 @@ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry
}
if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure and determine whether or not this
is retryable to throw a non-retryable failure exception
- String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
UNDEFINED_JOB_NAME);
+ String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY,
UNDEFINED_JOB_NAME);
throw new IOException("Failed to commit dataset state for some
dataset(s) of job " + jobName);
}
} catch (InterruptedException exc) {
throw new IOException(exc);
}
}
+ /** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>}
indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
+ private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem
fs, JobState jobState, int numThreads) throws IOException {
+ // TODO - decide whether to replace this method by adapting
TaskStateCollectorService::collectOutputTaskStates (whence much of this code
was drawn)
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ // NOTE: TaskState dir is assumed to be a sibling to the workunits dir
(following conventions of `MRJobLauncher`)
+ String jobIdPathName = new
Path(workSpec.getWorkUnitsDir()).getParent().getName();
+ log.info("TaskStateStore path (name component): '{}' (fs: '{}')",
jobIdPathName, fs.getUri());
+ Optional<Queue<TaskState>> taskStateQueueOpt =
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobIdPathName, numThreads);
+ return taskStateQueueOpt.map(taskStateQueue ->
+ taskStateQueue.stream().peek(taskState ->
+ // CRITICAL: although some `WorkUnit`s, like those created by
`CopySource::FileSetWorkUnitGenerator` for each `CopyEntity`
+ // already themselves contain every prop of their `JobState`,
not all do.
+ // `TaskState extends WorkUnit` serialization will include its
constituent `WorkUnit`, but not the constituent `JobState`.
+ // given some `JobState` props may be essential for
commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
+ taskState.setJobState(jobState)
Review Comment:
now doing as by
[gobblin-on-MR](https://github.com/apache/gobblin/blob/a74d17a0123218ac4c867caeefaee2f472b438e7/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java#L204).
moreover, I added a TODO above to consider whether just to outright leverage
that `TaskStateCollectorService` method directly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]