Will-Lo commented on code in PR #4030:
URL: https://github.com/apache/gobblin/pull/4030#discussion_r1755496938


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java:
##########
@@ -572,56 +573,22 @@ private void addJars(Path jarFileDir, String jarFileList, 
Configuration conf) th
     for (String jarFile : SPLITTER.split(jarFileList)) {
       Path srcJarFile = new Path(jarFile);
       FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);
-
       for (FileStatus status : fileStatusList) {
+        Path destJarFile = HdfsJarUploadUtils.calculateDestJarFile(fs, status, 
this.unsharedJarsDir, jarFileDir);
         // For each FileStatus there are chances it could fail in copying at 
the first attempt, due to file-existence
         // or file-copy is ongoing by other job instance since all Gobblin 
jobs share the same jar file directory.
         // the retryCount is to avoid cases (if any) where retry is going too 
far and causes job hanging.
-        int retryCount = 0;
-        boolean shouldFileBeAddedIntoDC = true;
-        Path destJarFile = calculateDestJarFile(status, jarFileDir);
-        // Adding destJarFile into HDFS until it exists and the size of file 
on targetPath matches the one on local path.
-        while (!this.fs.exists(destJarFile) || 
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
-          try {
-            if (this.fs.exists(destJarFile) && 
fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
-              Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
-              throw new IOException("Waiting for file to complete on uploading 
... ");
-            }
-            // Set the first parameter as false for not deleting sourceFile
-            // Set the second parameter as false for not overwriting existing 
file on the target, by default it is true.
-            // If the file is preExisted but overwrite flag set to false, then 
an IOException if thrown.
-            this.fs.copyFromLocalFile(false, false, status.getPath(), 
destJarFile);
-          } catch (IOException | InterruptedException e) {
-            LOG.warn("Path:" + destJarFile + " is not copied successfully. 
Will require retry.");
-            retryCount += 1;
-            if (retryCount >= this.jarFileMaximumRetry) {
-              LOG.error("The jar file:" + destJarFile + "failed in being 
copied into hdfs", e);
-              // If retry reaches upper limit, skip copying this file.
-              shouldFileBeAddedIntoDC = false;
-              break;
-            }
-          }
-        }
-        if (shouldFileBeAddedIntoDC) {
+        if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status, 
this.jarFileMaximumRetry, destJarFile)) {
           // Then add the jar file on HDFS to the classpath
           LOG.info(String.format("Adding %s to classpath", destJarFile));
           DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
+        } else {
+          LOG.error("Failed to upload jar file: " + status.getPath());

Review Comment:
   Possibly we should throw an error, but I think given that another job could 
be uploading the same jars though it might be better to let the job attempt to 
try and run, if that job fails it should be emitting the failed event anyways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to