[ 
https://issues.apache.org/jira/browse/GOBBLIN-2054?focusedWorklogId=916274&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-916274
 ]

ASF GitHub Bot logged work on GOBBLIN-2054:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Apr/24 22:53
            Start Date: 24/Apr/24 22:53
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3934:
URL: https://github.com/apache/gobblin/pull/3934#discussion_r1578488823


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java:
##########
@@ -544,8 +544,7 @@ public static Optional<Class<? extends DataPublisher>> 
getJobDataPublisherClass(
    * or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true.
    */
   public static boolean shouldCommitDataInJob(State state) {
-    boolean jobCommitPolicyIsFull =
-        JobCommitPolicy.getCommitPolicy(state.getProperties()) == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
+    boolean jobCommitPolicyIsFull = JobCommitPolicy.getCommitPolicy(state) == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;

Review Comment:
   should be much cheaper than copying all state props merely to access a 
single one!



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -104,8 +104,7 @@ public int execute(Properties jobProps, 
EventSubmitterContext eventSubmitterCont
       throw ApplicationFailure.newNonRetryableFailureWithCause(
           String.format("Failed Gobblin job %s", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
           e.getClass().toString(),
-          e,
-          null
+          e

Review Comment:
   resolve -
   ```
   
gobblin/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:108:
 warning: non-varargs call of varargs method with inexact argument type for 
last parameter;
             null
             ^
   ```
   



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -166,44 +163,39 @@ public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry
       }
       if (!IteratorExecutor.verifyAllSuccessful(result)) {
         // TODO: propagate cause of failure and determine whether or not this 
is retryable to throw a non-retryable failure exception
-        String jobName = 
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
+        String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
         throw new IOException("Failed to commit dataset state for some 
dataset(s) of job " + jobName);
       }
     } catch (InterruptedException exc) {
       throw new IOException(exc);
     }
   }
 
+  /** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>} 
indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
+  private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem 
fs, JobState jobState, int numThreads) throws IOException {
+    // TODO - decide whether to replace this method by adapting 
TaskStateCollectorService::collectOutputTaskStates (whence much of this code 
was drawn)
+    StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+    // NOTE: TaskState dir is assumed to be a sibling to the workunits dir 
(following conventions of `MRJobLauncher`)
+    String jobIdPathName = new 
Path(workSpec.getWorkUnitsDir()).getParent().getName();
+    log.info("TaskStateStore path (name component): '{}' (fs: '{}')", 
jobIdPathName, fs.getUri());
+    Optional<Queue<TaskState>> taskStateQueueOpt = 
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobIdPathName, numThreads);
+    return taskStateQueueOpt.map(taskStateQueue ->
+        taskStateQueue.stream().peek(taskState ->
+                // CRITICAL: although some `WorkUnit`s, like those created by 
`CopySource::FileSetWorkUnitGenerator` for each `CopyEntity`
+                // already themselves contain every prop of their `JobState`, 
not all do.
+                // `TaskState extends WorkUnit` serialization will include its 
constituent `WorkUnit`, but not the constituent `JobState`.
+                // given some `JobState` props may be essential for 
commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
+                taskState.setJobState(jobState)

Review Comment:
   as in 
[gobblin-on-MR](https://github.com/apache/gobblin/blob/a74d17a0123218ac4c867caeefaee2f472b438e7/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java#L204).
  I added a TODO to consider whether we just want to call that method outright.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -166,44 +163,39 @@ public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry
       }
       if (!IteratorExecutor.verifyAllSuccessful(result)) {
         // TODO: propagate cause of failure and determine whether or not this 
is retryable to throw a non-retryable failure exception
-        String jobName = 
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
+        String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY, 
UNDEFINED_JOB_NAME);
         throw new IOException("Failed to commit dataset state for some 
dataset(s) of job " + jobName);
       }
     } catch (InterruptedException exc) {
       throw new IOException(exc);
     }
   }
 
+  /** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>} 
indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
+  private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem 
fs, JobState jobState, int numThreads) throws IOException {
+    // TODO - decide whether to replace this method by adapting 
TaskStateCollectorService::collectOutputTaskStates (whence much of this code 
was drawn)
+    StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
+    // NOTE: TaskState dir is assumed to be a sibling to the workunits dir 
(following conventions of `MRJobLauncher`)
+    String jobIdPathName = new 
Path(workSpec.getWorkUnitsDir()).getParent().getName();
+    log.info("TaskStateStore path (name component): '{}' (fs: '{}')", 
jobIdPathName, fs.getUri());
+    Optional<Queue<TaskState>> taskStateQueueOpt = 
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobIdPathName, numThreads);
+    return taskStateQueueOpt.map(taskStateQueue ->
+        taskStateQueue.stream().peek(taskState ->
+                // CRITICAL: although some `WorkUnit`s, like those created by 
`CopySource::FileSetWorkUnitGenerator` for each `CopyEntity`
+                // already themselves contain every prop of their `JobState`, 
not all do.
+                // `TaskState extends WorkUnit` serialization will include its 
constituent `WorkUnit`, but not the constituent `JobState`.
+                // given some `JobState` props may be essential for 
commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
+                taskState.setJobState(jobState)
+                // TODO - decide whether something akin necessary to 
streamline cumulative in-memory size of all issues: 
consumeTaskIssues(taskState);
+            ).collect(Collectors.toList())
+    ).orElseGet(() -> {
+      log.error("TaskStateStore successfully opened, but no task states found 
under (name) '{}'", jobIdPathName);
+      return Lists.newArrayList();
+    });
+  }
+
   /** @return id/correlator for this particular commit activity */
   private static String calcCommitId(WUProcessingSpec workSpec) {
     return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
   }
-
-  /**
-   * Organize task states by dataset urns.
-   * @param taskStates
-   * @return A map of dataset urns to dataset task states.
-   */
-  public static Map<String, JobState.DatasetState> 
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
-    Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
-
-    //TODO: handle skipped tasks?
-    for (TaskState taskState : taskStates) {
-      String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
-      datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
-      datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
-    }
-
-    return datasetStatesByUrns;
-  }
-
-  private static String createDatasetUrn(Map<String, JobState.DatasetState> 
datasetStatesByUrns, TaskState taskState) {
-    String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, 
ConfigurationKeys.DEFAULT_DATASET_URN);
-    if (!datasetStatesByUrns.containsKey(datasetUrn)) {
-      JobState.DatasetState datasetState = new JobState.DatasetState();

Review Comment:
   this no-arg ctor should possibly only be used for serialization.
   
   w/o a `jobName`, this results later while persisting task states at the end 
of commit:
   ```
   Caused by: java.lang.IllegalArgumentException: Can not create a Path from a 
null string
        at org.apache.hadoop.fs.Path.checkPathArg(Path.java:159)
        at org.apache.hadoop.fs.Path.<init>(Path.java:175)
        at org.apache.hadoop.fs.Path.<init>(Path.java:110)
        at 
org.apache.gobblin.runtime.FsDatasetStateStore.sanitizeDatasetStatestoreNameFromDatasetURN(FsDatasetStateStore.java:175)
        at 
org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:386)
        at 
org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:90)
        at 
org.apache.gobblin.runtime.SafeDatasetCommit.persistDatasetState(SafeDatasetCommit.java:418)
        at 
org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:191)
        ... 8 more
   ```
   iceberg-distcp succeeded because it happened to have this set:
   ```
   state.store.enabled=false
   ```
   
([gobblin-on-MR](https://github.com/apache/gobblin/blob/a74d17a0123218ac4c867caeefaee2f472b438e7/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java#L725)
 solution we now leverage) 





Issue Time Tracking
-------------------

            Worklog Id:     (was: 916274)
    Remaining Estimate: 0h
            Time Spent: 10m

> `CommitActivityImpl` fails for job types (sources) other than Iceberg-Distcp
> ----------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2054
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2054
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-core
>            Reporter: Kip Kohn
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> gobblin-on-temporal execution has been failing for other job types than 
> iceberg-distcp (which uses `CopySource`).  in particular Commit fails with:
> {code}
> java.lang.IllegalArgumentException: Missing required property 
> writer.output.dir
>       at 
> com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
>       at 
> org.apache.gobblin.util.WriterUtils.getWriterOutputDir(WriterUtils.java:121)
>       at 
> org.apache.gobblin.publisher.BaseDataPublisher.publishData(BaseDataPublisher.java:390)
>       at 
> org.apache.gobblin.publisher.BaseDataPublisher.publishMultiTaskData(BaseDataPublisher.java:379)
>       at 
> org.apache.gobblin.publisher.BaseDataPublisher.publishData(BaseDataPublisher.java:366)
>       at 
> org.apache.gobblin.publisher.DataPublisher.publish(DataPublisher.java:81)
>       at 
> org.apache.gobblin.runtime.SafeDatasetCommit.commitDataset(SafeDatasetCommit.java:260)
>       at 
> org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:168)
>       at 
> org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:64)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> org.apache.gobblin.util.executors.MDCPropagatingRunnable.run(MDCPropagatingRunnable.java:39)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> this is odd because that same prop had already been used prior to commit, 
> while processing the `WorkUnit`!  moreover logging shows it to be present 
> within the `JobState`
> anyway, even when using a private build that hard-coded that property, this 
> later error arises:
> {code}
> Caused by: java.lang.IllegalArgumentException: Can not create a Path from a 
> null string
>       at org.apache.hadoop.fs.Path.checkPathArg(Path.java:159)
>       at org.apache.hadoop.fs.Path.<init>(Path.java:175)
>       at org.apache.hadoop.fs.Path.<init>(Path.java:110)
>       at 
> org.apache.gobblin.runtime.FsDatasetStateStore.sanitizeDatasetStatestoreNameFromDatasetURN(FsDatasetStateStore.java:175)
>       at 
> org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:386)
>       at 
> org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:90)
>       at 
> org.apache.gobblin.runtime.SafeDatasetCommit.persistDatasetState(SafeDatasetCommit.java:418)
>       at 
> org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:191)
>       ... 8 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to