Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4939#discussion_r149071491
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
    @@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration 
conf, Map<String, String> ap
        }
     
        /**
    +    * Copy a local file to a remote file system.
    +    *
    +    * @param fs
    +    *              remote filesystem
    +    * @param appId
    +    *              application ID
    +    * @param localRsrcPath
    +    *              path to the local file
    +    * @param homedir
    +    *              remote home directory base (will be extended)
    +    * @param relativeTargetPath
    +    *              relative target path of the file (will be prefixed be 
the full home directory we set up)
    +    *
         * @return Path to remote file (usually hdfs)
    -    * @throws IOException
         */
    -   public static Path setupLocalResource(
    -                   FileSystem fs,
    -                   String appId, Path localRsrcPath,
    -                   LocalResource appMasterJar,
    -                   Path homedir) throws IOException {
    +   static Tuple2<Path, LocalResource> setupLocalResource(
    +           FileSystem fs,
    +           String appId,
    +           Path localRsrcPath,
    +           Path homedir,
    +           String relativeTargetPath) throws IOException {
    +
    +           if (new File(localRsrcPath.toUri().getPath()).isDirectory()) {
    +                   throw new IllegalArgumentException("File to copy must 
not be a directory: " +
    +                           localRsrcPath);
    +           }
     
                // copy resource to HDFS
    -           String suffix = ".flink/" + appId + "/" + 
localRsrcPath.getName();
    +           String suffix = ".flink/" + appId + "/" + relativeTargetPath + 
"/" + localRsrcPath.getName();
     
                Path dst = new Path(homedir, suffix);
     
                LOG.info("Copying from " + localRsrcPath + " to " + dst);
    -           fs.copyFromLocalFile(localRsrcPath, dst);
    -           registerLocalResource(fs, dst, appMasterJar);
    -           return dst;
    +
    +           fs.copyFromLocalFile(false, true, localRsrcPath, dst);
    +
    +           // now create the resource instance
    +           LocalResource resource = Records.newRecord(LocalResource.class);
    +           registerLocalResource(fs, dst, resource);
    +           return Tuple2.of(dst, resource);
        }
     
    -   public static void registerLocalResource(FileSystem fs, Path 
remoteRsrcPath, LocalResource localResource) throws IOException {
    +   private static void registerLocalResource(
    --- End diff --
    
    Touching this code, could we change it that we create and return a 
`LocalResource` in this method?


---

Reply via email to