[
https://issues.apache.org/jira/browse/GOBBLIN-2135?focusedWorklogId=934542&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934542
]
ASF GitHub Bot logged work on GOBBLIN-2135:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 12/Sep/24 15:09
Start Date: 12/Sep/24 15:09
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4030:
URL: https://github.com/apache/gobblin/pull/4030#discussion_r1757074356
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -720,26 +731,26 @@ private void addLibJars(Path srcLibJarDir,
Optional<Map<String, LocalResource>>
}
for (FileStatus libJarFile : libJarFiles) {
- Path destFilePath = new Path(destDir, libJarFile.getPath().getName());
- this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath);
- if (resourceMap.isPresent()) {
+ Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs,
libJarFile, unsharedDir, destCacheDir);
+ if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile,
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent())
{
YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
+ } else {
+ LOGGER.warn("Failed to upload jar file {} to HDFS",
libJarFile.getPath());
}
}
}
-
- private void addAppJars(String jarFilePathList, Optional<Map<String,
LocalResource>> resourceMap,
- Path destDir, FileSystem localFs) throws IOException {
+ private void addAppJars(String jarFilePathList, Optional<Map<String,
LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir,
+ FileSystem localFs) throws IOException {
for (String jarFilePath : SPLITTER.split(jarFilePathList)) {
Path srcFilePath = new Path(jarFilePath);
- Path destFilePath = new Path(destDir, srcFilePath.getName());
- if (localFs.exists(srcFilePath)) {
- this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+ FileStatus localJar = localFs.getFileStatus(srcFilePath);
+ Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs,
localJar, unsharedDir, destCacheDir);
+ if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar,
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) {
+ if (resourceMap.isPresent()) {
+ YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
+ }
} else {
- LOGGER.warn("The src destination " + srcFilePath + " doesn't exists");
- }
- if (resourceMap.isPresent()) {
- YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
+ LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath);
Review Comment:
I guess that's fine to start. how about a `// TODO: decide whether to
fail-fast here, given the job may be unable to run w/o it`
##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -720,26 +731,26 @@ private void addLibJars(Path srcLibJarDir,
Optional<Map<String, LocalResource>>
}
for (FileStatus libJarFile : libJarFiles) {
- Path destFilePath = new Path(destDir, libJarFile.getPath().getName());
- this.fs.copyFromLocalFile(libJarFile.getPath(), destFilePath);
- if (resourceMap.isPresent()) {
+ Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs,
libJarFile, unsharedDir, destCacheDir);
+ if (HdfsJarUploadUtils.uploadJarToHdfs(fs, libJarFile,
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath) && resourceMap.isPresent())
{
YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
+ } else {
+ LOGGER.warn("Failed to upload jar file {} to HDFS",
libJarFile.getPath());
}
}
}
-
- private void addAppJars(String jarFilePathList, Optional<Map<String,
LocalResource>> resourceMap,
- Path destDir, FileSystem localFs) throws IOException {
+ private void addAppJars(String jarFilePathList, Optional<Map<String,
LocalResource>> resourceMap, Path destCacheDir, Path unsharedDir,
+ FileSystem localFs) throws IOException {
for (String jarFilePath : SPLITTER.split(jarFilePathList)) {
Path srcFilePath = new Path(jarFilePath);
- Path destFilePath = new Path(destDir, srcFilePath.getName());
- if (localFs.exists(srcFilePath)) {
- this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+ FileStatus localJar = localFs.getFileStatus(srcFilePath);
+ Path destFilePath = HdfsJarUploadUtils.calculateDestJarFile(this.fs,
localJar, unsharedDir, destCacheDir);
+ if (HdfsJarUploadUtils.uploadJarToHdfs(fs, localJar,
MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT, destFilePath)) {
+ if (resourceMap.isPresent()) {
+ YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
+ }
} else {
- LOGGER.warn("The src destination " + srcFilePath + " doesn't exists");
- }
- if (resourceMap.isPresent()) {
- YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath,
LocalResourceType.FILE, resourceMap.get());
+ LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath);
Review Comment:
I guess that's fine to start. how about a
```
// TODO: decide whether to fail-fast here, given the job may be unable to
run w/o it
```
Issue Time Tracking
-------------------
Worklog Id: (was: 934542)
Time Spent: 1h 10m (was: 1h)
> Cache Yarn jars in GobblinYarnAppLauncher
> -----------------------------------------
>
> Key: GOBBLIN-2135
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2135
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: William Lo
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Gobblin YARN Application Launcher lacks some functionality used in
> MRJobLauncher. One of the biggest gaps in feature parity is the absence of
> jar caching, where MRJobLauncher creates a monthly cache that is
> automatically cleaned up by subsequent executions performed 2 months in
> advance.
> YARN/MR requires uploading jars to HDFS, this step can be quite slow (~15
> mins for a sizeable job to get all the jars), and given that many jobs do
> share the same jars, it makes sense to cache them together and only provide
> YARN the shared path.
> We also want to ensure that SNAPSHOT jars are other files are not uploaded to
> a cache, since they are not immutable unlike jar versions on Artifactory.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)