[
https://issues.apache.org/jira/browse/GOBBLIN-2135?focusedWorklogId=934343&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934343
]
ASF GitHub Bot logged work on GOBBLIN-2135:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 11/Sep/24 19:52
Start Date: 11/Sep/24 19:52
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #4030:
URL: https://github.com/apache/gobblin/pull/4030#discussion_r1755496938
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java:
##########
@@ -572,56 +573,22 @@ private void addJars(Path jarFileDir, String jarFileList,
Configuration conf) th
for (String jarFile : SPLITTER.split(jarFileList)) {
Path srcJarFile = new Path(jarFile);
FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);
-
for (FileStatus status : fileStatusList) {
+ Path destJarFile = HdfsJarUploadUtils.calculateDestJarFile(fs, status,
this.unsharedJarsDir, jarFileDir);
// For each FileStatus there are chances it could fail in copying at
the first attempt, due to file-existence
// or file-copy is ongoing by other job instance since all Gobblin
jobs share the same jar file directory.
// the retryCount is to avoid cases (if any) where retry is going too
far and causes job hanging.
- int retryCount = 0;
- boolean shouldFileBeAddedIntoDC = true;
- Path destJarFile = calculateDestJarFile(status, jarFileDir);
- // Adding destJarFile into HDFS until it exists and the size of file
on targetPath matches the one on local path.
- while (!this.fs.exists(destJarFile) ||
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
- try {
- if (this.fs.exists(destJarFile) &&
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
- Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
- throw new IOException("Waiting for file to complete on uploading
... ");
- }
- // Set the first parameter as false for not deleting sourceFile
- // Set the second parameter as false for not overwriting existing
file on the target, by default it is true.
- // If the file is preExisted but overwrite flag set to false, then
an IOException if thrown.
- this.fs.copyFromLocalFile(false, false, status.getPath(),
destJarFile);
- } catch (IOException | InterruptedException e) {
- LOG.warn("Path:" + destJarFile + " is not copied successfully.
Will require retry.");
- retryCount += 1;
- if (retryCount >= this.jarFileMaximumRetry) {
- LOG.error("The jar file:" + destJarFile + "failed in being
copied into hdfs", e);
- // If retry reaches upper limit, skip copying this file.
- shouldFileBeAddedIntoDC = false;
- break;
- }
- }
- }
- if (shouldFileBeAddedIntoDC) {
+ if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status,
this.jarFileMaximumRetry, destJarFile)) {
// Then add the jar file on HDFS to the classpath
LOG.info(String.format("Adding %s to classpath", destJarFile));
DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
+ } else {
+ LOG.error("Failed to upload jar file: " + status.getPath());
Review Comment:
Possibly we should throw an error, but I think given that another job could
be uploading the same jars though it might be better to let the job attempt to
try and run, if that job fails it should be emitting the failed event anyways.
Issue Time Tracking
-------------------
Worklog Id: (was: 934343)
Time Spent: 0.5h (was: 20m)
> Cache Yarn jars in GobblinYarnAppLauncher
> -----------------------------------------
>
> Key: GOBBLIN-2135
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2135
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: William Lo
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Gobblin YARN Application Launcher lacks some functionality used in
> MRJobLauncher. One of the biggest gaps in feature parity is the absence of
> jar caching, where MRJobLauncher creates a monthly cache that is
> automatically cleaned up by subsequent executions performed 2 months in
> advance.
> YARN/MR requires uploading jars to HDFS, this step can be quite slow (~15
> mins for a sizeable job to get all the jars), and given that many jobs do
> share the same jars, it makes sense to cache them together and only provide
> YARN the shared path.
> We also want to ensure that SNAPSHOT jars are other files are not uploaded to
> a cache, since they are not immutable unlike jar versions on Artifactory.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)