phet commented on code in PR #3880:
URL: https://github.com/apache/gobblin/pull/3880#discussion_r1501202935


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java:
##########
@@ -71,23 +104,69 @@ public static StateStore<TaskState> 
openTaskStateStoreUncached(JobState jobState
     return new FsStateStore<>(fs, taskStateStorePath.toUri().getPath(), 
TaskState.class);
   }
 
+  /**
+   * ATTENTION: derives path according to {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+   * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+   * @return "base" dir root path for work dir (parent of inputs, output task 
states, etc.)
+   */
+  public static Path getWorkDirRoot(JobState jobState) {
+    return new Path(
+        new Path(jobState.getProp(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY), 
jobState.getJobName()),
+        jobState.getJobId());
+  }
+
   /**
    * ATTENTION: derives path according to {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
    * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
    * @return path to {@link FsStateStore<TaskState>} backing dir
    */
   public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) {
-    Properties jobProps = jobState.getProperties();
-    Path jobOutputPath = new Path(
-        new Path(
-            new Path(
-                jobProps.getProperty(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY),
-                JobState.getJobNameFromProps(jobProps)),
-            JobState.getJobIdFromProps(jobProps)),
-        OUTPUT_DIR_NAME);
+    Path jobOutputPath = new Path(getWorkDirRoot(jobState), OUTPUT_DIR_NAME);
     return fs.makeQualified(jobOutputPath);
   }
 
+  /** write serialized {@link WorkUnit}s in parallel into files named after 
the jobID and task IDs */
+  public static void writeWorkUnits(List<WorkUnit> workUnits, Path 
workDirRootPath, JobState jobState, FileSystem fs)
+      throws IOException {
+    String jobId = jobState.getJobId();
+    Path targetDirPath = new Path(workDirRootPath, INPUT_DIR_NAME);
+
+    int numThreads = 
ParallelRunner.getNumThreadsConfig(jobState.getProperties());
+    Closer closer = Closer.create(); // (NOTE: try-with-resources syntax 
wouldn't allow `catch { closer.rethrow(t) }`)
+    try {
+      ParallelRunner parallelRunner = closer.register(new 
ParallelRunner(numThreads, fs));
+
+      JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new 
JobLauncherUtils.WorkUnitPathCalculator();
+      int i = 0;
+      for (WorkUnit workUnit : workUnits) {
+        Path workUnitFile = pathCalculator.calcNextPath(workUnit, jobId, 
targetDirPath);
+        if (i++ == 0) {
+          log.info("Writing work unit file [{}]: '{}'", i - 1, i == 1 ? 
workUnitFile : ("./" + workUnitFile.getName()));
+        }

Review Comment:
   you're absolutely right... I originally printed them all, then thought it 
likely be TOO MUCH.  I'll correct the impl...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to