Will-Lo commented on code in PR #4030:
URL: https://github.com/apache/gobblin/pull/4030#discussion_r1755496938
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java:
##########
@@ -572,56 +573,22 @@ private void addJars(Path jarFileDir, String jarFileList,
Configuration conf) th
for (String jarFile : SPLITTER.split(jarFileList)) {
Path srcJarFile = new Path(jarFile);
FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);
-
for (FileStatus status : fileStatusList) {
+ Path destJarFile = HdfsJarUploadUtils.calculateDestJarFile(fs, status,
this.unsharedJarsDir, jarFileDir);
// For each FileStatus there are chances it could fail in copying at
the first attempt, due to file-existence
// or file-copy is ongoing by other job instance since all Gobblin
jobs share the same jar file directory.
// the retryCount is to avoid cases (if any) where retry is going too
far and causes job hanging.
- int retryCount = 0;
- boolean shouldFileBeAddedIntoDC = true;
- Path destJarFile = calculateDestJarFile(status, jarFileDir);
- // Adding destJarFile into HDFS until it exists and the size of file
on targetPath matches the one on local path.
- while (!this.fs.exists(destJarFile) ||
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
- try {
- if (this.fs.exists(destJarFile) &&
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
- Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
- throw new IOException("Waiting for file to complete on uploading
... ");
- }
- // Set the first parameter as false for not deleting sourceFile
- // Set the second parameter as false for not overwriting existing
file on the target, by default it is true.
- // If the file is preExisted but overwrite flag set to false, then
an IOException if thrown.
- this.fs.copyFromLocalFile(false, false, status.getPath(),
destJarFile);
- } catch (IOException | InterruptedException e) {
- LOG.warn("Path:" + destJarFile + " is not copied successfully.
Will require retry.");
- retryCount += 1;
- if (retryCount >= this.jarFileMaximumRetry) {
- LOG.error("The jar file:" + destJarFile + "failed in being
copied into hdfs", e);
- // If retry reaches upper limit, skip copying this file.
- shouldFileBeAddedIntoDC = false;
- break;
- }
- }
- }
- if (shouldFileBeAddedIntoDC) {
+ if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status,
this.jarFileMaximumRetry, destJarFile)) {
// Then add the jar file on HDFS to the classpath
LOG.info(String.format("Adding %s to classpath", destJarFile));
DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
+ } else {
+ LOG.error("Failed to upload jar file: " + status.getPath());
Review Comment:
Possibly we should throw an error, but I think given that another job could
be uploading the same jars though it might be better to let the job attempt to
try and run, if that job fails it should be emitting the failed event anyways.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]