phet commented on code in PR #3880:
URL: https://github.com/apache/gobblin/pull/3880#discussion_r1501202935
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java:
##########
@@ -71,23 +104,69 @@ public static StateStore<TaskState>
openTaskStateStoreUncached(JobState jobState
return new FsStateStore<>(fs, taskStateStorePath.toUri().getPath(),
TaskState.class);
}
+ /**
+ * ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+ * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+ * @return "base" dir root path for work dir (parent of inputs, output task
states, etc.)
+ */
+ public static Path getWorkDirRoot(JobState jobState) {
+ return new Path(
+ new Path(jobState.getProp(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY),
jobState.getJobName()),
+ jobState.getJobId());
+ }
+
/**
* ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
* {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
* @return path to {@link FsStateStore<TaskState>} backing dir
*/
public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) {
- Properties jobProps = jobState.getProperties();
- Path jobOutputPath = new Path(
- new Path(
- new Path(
- jobProps.getProperty(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY),
- JobState.getJobNameFromProps(jobProps)),
- JobState.getJobIdFromProps(jobProps)),
- OUTPUT_DIR_NAME);
+ Path jobOutputPath = new Path(getWorkDirRoot(jobState), OUTPUT_DIR_NAME);
return fs.makeQualified(jobOutputPath);
}
+ /** write serialized {@link WorkUnit}s in parallel into files named after
the jobID and task IDs */
+ public static void writeWorkUnits(List<WorkUnit> workUnits, Path
workDirRootPath, JobState jobState, FileSystem fs)
+ throws IOException {
+ String jobId = jobState.getJobId();
+ Path targetDirPath = new Path(workDirRootPath, INPUT_DIR_NAME);
+
+ int numThreads =
ParallelRunner.getNumThreadsConfig(jobState.getProperties());
+ Closer closer = Closer.create(); // (NOTE: try-with-resources syntax
wouldn't allow `catch { closer.rethrow(t) }`)
+ try {
+ ParallelRunner parallelRunner = closer.register(new
ParallelRunner(numThreads, fs));
+
+ JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new
JobLauncherUtils.WorkUnitPathCalculator();
+ int i = 0;
+ for (WorkUnit workUnit : workUnits) {
+ Path workUnitFile = pathCalculator.calcNextPath(workUnit, jobId,
targetDirPath);
+ if (i++ == 0) {
+ log.info("Writing work unit file [{}]: '{}'", i - 1, i == 1 ?
workUnitFile : ("./" + workUnitFile.getName()));
+ }
Review Comment:
you're absolutely right... I originally printed them all, then thought it
likely TOO MUCH, and added the limit for only the first. I'll correct the
impl...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]