phet commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1425786741
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -209,36 +209,36 @@ private void collectOutputTaskStates() throws IOException
{
// Finish any additional steps defined in handler on driver level.
// Currently implemented handler for Hive registration only.
if (optionalTaskCollectorHandler.isPresent()) {
- log.info("Execute Pipelined TaskStateCollectorService Handler for " +
taskStateQueue.size() + " tasks");
+ log.info("Execute Pipelined TaskStateCollectorService Handler for " +
taskStateQueue.get().size() + " tasks");
try {
- optionalTaskCollectorHandler.get().handle(taskStateQueue);
+ optionalTaskCollectorHandler.get().handle(taskStateQueue.get());
} catch (Throwable t) {
if (isJobProceedOnCollectorServiceFailure) {
log.error("Failed to commit dataset while job proceeds", t);
- SafeDatasetCommit.setTaskFailureException(taskStateQueue, t);
+ SafeDatasetCommit.setTaskFailureException(taskStateQueue.get(), t);
} else {
throw new RuntimeException("Hive Registration as the
TaskStateCollectorServiceHandler failed.", t);
}
}
}
// Notify the listeners for the completion of the tasks
- this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
+ this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue.get())));
}
/**
- * Reads in a {@link FsStateStore} folder used to store Task state outputs,
and returns a queue of {@link TaskState}s
+ * Reads in a @{@link StateStore} and deserializes all task states found in
the provided table name
Review Comment:
typo: `@{@link StateStore}`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -190,13 +190,13 @@ protected void shutDown() throws Exception {
*/
private void collectOutputTaskStates() throws IOException {
- final Queue<TaskState> taskStateQueue =
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir,
this.stateSerDeRunnerThreads);
- if (taskStateQueue == null) {
+ final Optional<Queue<TaskState>> taskStateQueue =
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir.getName(),
this.stateSerDeRunnerThreads);
+ if (!taskStateQueue.isPresent()) {
return;
}
// Add the TaskStates of completed tasks to the JobState so when the
control
// returns to the launcher, it sees the TaskStates of all completed tasks.
- for (TaskState taskState : taskStateQueue) {
+ for (TaskState taskState : taskStateQueue.get()) {
Review Comment:
nit: the farther down we get, the harder to keep track of whether these
`.get()` calls are truly safe (i.e. previously checked). the common convention
to establish safety looks like:
```
if (!optTSQ.isPresent()) {
return;
} // could be `else`, if you don't mind indentation
Queue<TaskState> tsq = optTSQ.get();
// (henceforth, no more mention of `optTSQ`)
...
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -54,9 +54,14 @@ public int process(WUProcessingSpec workSpec) {
if (workunitsProcessed > 0) {
CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
int result = commitWorkflow.commit(workSpec);
+ if (result == 0) {
+ log.warn("No work units committed at the job level. They could be
committed at a task level.");
+ }
return result;
+ } else {
+ log.error("No workunits processed, so no commit will be attempted.");
Review Comment:
nit: strike out -will be- or -will be attempted-, since past tense
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -75,11 +74,14 @@ public int commit(WUProcessingSpec workSpec) {
Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
- Collection<TaskState> taskStateQueue =
- ImmutableList.copyOf(
-
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
- commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
- return taskStateQueue.size();
+ Optional<Queue<TaskState>> taskStateQueue =
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath.getName(), numDeserializationThreads);
+ if (!taskStateQueue.isPresent()) {
+ log.error("No task states found at " + jobOutputPath);
+ return 0;
+ }
+ commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue.get()),
globalGobblinContext);
Review Comment:
same comment as above w/ `.get()` only once, then no further use of the
`Optional`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]