[ 
https://issues.apache.org/jira/browse/GOBBLIN-2054?focusedWorklogId=916279&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-916279
 ]

ASF GitHub Bot logged work on GOBBLIN-2054:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Apr/24 22:57
            Start Date: 24/Apr/24 22:57
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3934:
URL: https://github.com/apache/gobblin/pull/3934#discussion_r1578604945


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -166,44 +163,39 @@ public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry
       }
       if (!IteratorExecutor.verifyAllSuccessful(result)) {
         // TODO: propagate cause of failure and determine whether or not this 
is retryable to throw a non-retryable failure exception
-        String jobName = 
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
+        String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
         throw new IOException("Failed to commit dataset state for some 
dataset(s) of job " + jobName);
       }
     } catch (InterruptedException exc) {
       throw new IOException(exc);
     }
   }
 
+  /** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>} 
indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
+  private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem 
fs, JobState jobState, int numThreads) throws IOException {
+    // TODO - decide whether to replace this method by adapting 
TaskStateCollectorService::collectOutputTaskStates (whence much of this code 
was drawn)
+    StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+    // NOTE: TaskState dir is assumed to be a sibling to the workunits dir 
(following conventions of `MRJobLauncher`)
+    String jobIdPathName = new 
Path(workSpec.getWorkUnitsDir()).getParent().getName();
+    log.info("TaskStateStore path (name component): '{}' (fs: '{}')", 
jobIdPathName, fs.getUri());
+    Optional<Queue<TaskState>> taskStateQueueOpt = 
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobIdPathName, numThreads);
+    return taskStateQueueOpt.map(taskStateQueue ->
+        taskStateQueue.stream().peek(taskState ->
+                // CRITICAL: although some `WorkUnit`s, like those created by 
`CopySource::FileSetWorkUnitGenerator` for each `CopyEntity`
+                // already themselves contain every prop of their `JobState`, 
not all do.
+                // `TaskState extends WorkUnit` serialization will include its 
constituent `WorkUnit`, but not the constituent `JobState`.
+                // given some `JobState` props may be essential for 
commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
+                taskState.setJobState(jobState)

Review Comment:
   now doing as is done by 
[gobblin-on-MR](https://github.com/apache/gobblin/blob/a74d17a0123218ac4c867caeefaee2f472b438e7/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java#L204).
   
   moreover, I added a TODO above to consider whether just to outright leverage 
that `TaskStateCollectorService` method directly





Issue Time Tracking
-------------------

    Worklog Id:     (was: 916279)
    Time Spent: 50m  (was: 40m)

> `CommitActivityImpl` fails for job types (sources) other than Iceberg-Distcp
> ----------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2054
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2054
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-core
>            Reporter: Kip Kohn
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> gobblin-on-temporal execution has been failing for other job types than 
> iceberg-distcp (which uses `CopySource`).  in particular Commit fails with:
> {code}
> java.lang.IllegalArgumentException: Missing required property 
> writer.output.dir
>       at 
> com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
>       at 
> org.apache.gobblin.util.WriterUtils.getWriterOutputDir(WriterUtils.java:121)
>       at 
> org.apache.gobblin.publisher.BaseDataPublisher.publishData(BaseDataPublisher.java:390)
>       at 
> org.apache.gobblin.publisher.BaseDataPublisher.publishMultiTaskData(BaseDataPublisher.java:379)
>       at 
> org.apache.gobblin.publisher.BaseDataPublisher.publishData(BaseDataPublisher.java:366)
>       at 
> org.apache.gobblin.publisher.DataPublisher.publish(DataPublisher.java:81)
>       at 
> org.apache.gobblin.runtime.SafeDatasetCommit.commitDataset(SafeDatasetCommit.java:260)
>       at 
> org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:168)
>       at 
> org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:64)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> org.apache.gobblin.util.executors.MDCPropagatingRunnable.run(MDCPropagatingRunnable.java:39)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> this is odd because that same prop had already been used prior to commit, 
> while processing the `WorkUnit`!  moreover logging shows it to be present 
> within the `JobState`
> anyway, even when using a private build that hard-coded that property, this 
> later error arises:
> {code}
> Caused by: java.lang.IllegalArgumentException: Can not create a Path from a 
> null string
>       at org.apache.hadoop.fs.Path.checkPathArg(Path.java:159)
>       at org.apache.hadoop.fs.Path.<init>(Path.java:175)
>       at org.apache.hadoop.fs.Path.<init>(Path.java:110)
>       at 
> org.apache.gobblin.runtime.FsDatasetStateStore.sanitizeDatasetStatestoreNameFromDatasetURN(FsDatasetStateStore.java:175)
>       at 
> org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:386)
>       at 
> org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:90)
>       at 
> org.apache.gobblin.runtime.SafeDatasetCommit.persistDatasetState(SafeDatasetCommit.java:418)
>       at 
> org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:191)
>       ... 8 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to