[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4939 ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r151110083 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + --- End diff -- IIRC avro was excluded since it was just not necessary for the file-system, in which case it should stay. ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r150275387 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + --- End diff -- makes sense, I guess ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r150275161 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + --- End diff -- this dependency is, however, somehow related to `com.amazonaws`, not our code... ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149980302 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + --- End diff -- Can't we enforce jackson 2.6 via dependency management? I think this would be cleaner than excluding the dependencies here and assume that `aws-java-sdk-s3` pulls in the missing dependencies. ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149983029 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + --- End diff -- Shouldn't we override the avro version instead of excluding it? ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149983025 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + --- End diff -- It might be necessary to add these dependencies then explicitly here. ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149084279 --- Diff: flink-yarn/pom.xml --- @@ -99,6 +99,13 @@ under the License. + org.apache.flink + flink-s3-fs-hadoop + ${project.version} + test + --- End diff -- I think it's also ok this way :-) ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149083811 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration conf, Mapap } /** +* Copy a local file to a remote file system. +* +* @param fs +* remote filesystem +* @param appId +* application ID +* @param localRsrcPath +* path to the local file +* @param homedir +* remote home directory base (will be extended) +* @param relativeTargetPath +* relative target path of the file (will be prefixed be the full home directory we set up) +* * @return Path to remote file (usually hdfs) -* @throws IOException */ - public static Path setupLocalResource( - FileSystem fs, - String appId, Path localRsrcPath, - LocalResource appMasterJar, - Path homedir) throws IOException { + static Tuple2 setupLocalResource( + FileSystem fs, + String appId, + Path localRsrcPath, + Path homedir, + String relativeTargetPath) throws IOException { + + if (new File(localRsrcPath.toUri().getPath()).isDirectory()) { + throw new IllegalArgumentException("File to copy must not be a directory: " + + localRsrcPath); + } // copy resource to HDFS - String suffix = ".flink/" + appId + "/" + localRsrcPath.getName(); + String suffix = ".flink/" + appId + "/" + relativeTargetPath + "/" + localRsrcPath.getName(); --- End diff -- you could also check if the `relativeTargetPath` is empty. I would opt for the safe approach since I'm not sure whether this behaviour is specific to a Hadoop version or not. ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149078183 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -408,10 +437,12 @@ static ContainerLaunchContext createTaskExecutorContext( // prepare additional files to be shipped for (String pathStr : shipListString.split(",")) { if (!pathStr.isEmpty()) { + String[] pathWithKey = pathStr.split("="); --- End diff -- yes, if you see this as the order of the arguments in the array - I'll adapt the name ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149077720 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration conf, Mapap } /** +* Copy a local file to a remote file system. +* +* @param fs +* remote filesystem +* @param appId +* application ID +* @param localRsrcPath +* path to the local file +* @param homedir +* remote home directory base (will be extended) +* @param relativeTargetPath +* relative target path of the file (will be prefixed be the full home directory we set up) +* * @return Path to remote file (usually hdfs) -* @throws IOException */ - public static Path setupLocalResource( - FileSystem fs, - String appId, Path localRsrcPath, - LocalResource appMasterJar, - Path homedir) throws IOException { + static Tuple2 setupLocalResource( + FileSystem fs, + String appId, + Path localRsrcPath, + Path homedir, + String relativeTargetPath) throws IOException { + + if (new File(localRsrcPath.toUri().getPath()).isDirectory()) { + throw new IllegalArgumentException("File to copy must not be a directory: " + + localRsrcPath); + } // copy resource to HDFS - String suffix = ".flink/" + appId + "/" + localRsrcPath.getName(); + String suffix = ".flink/" + appId + "/" + relativeTargetPath + "/" + localRsrcPath.getName(); Path dst = new Path(homedir, suffix); LOG.info("Copying from " + localRsrcPath + " to " + dst); - fs.copyFromLocalFile(localRsrcPath, dst); - registerLocalResource(fs, dst, appMasterJar); - return dst; + + fs.copyFromLocalFile(false, true, localRsrcPath, dst); + + // now create the resource instance + LocalResource resource = Records.newRecord(LocalResource.class); + registerLocalResource(fs, dst, resource); + return Tuple2.of(dst, resource); } - public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException { + private static void registerLocalResource( --- End diff -- oh yes - totally... ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149077620 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration conf, Mapap } /** +* Copy a local file to a remote file system. +* +* @param fs +* remote filesystem +* @param appId +* application ID +* @param localRsrcPath +* path to the local file +* @param homedir +* remote home directory base (will be extended) +* @param relativeTargetPath +* relative target path of the file (will be prefixed be the full home directory we set up) +* * @return Path to remote file (usually hdfs) -* @throws IOException */ - public static Path setupLocalResource( - FileSystem fs, - String appId, Path localRsrcPath, - LocalResource appMasterJar, - Path homedir) throws IOException { + static Tuple2 setupLocalResource( + FileSystem fs, + String appId, + Path localRsrcPath, + Path homedir, + String relativeTargetPath) throws IOException { + + if (new File(localRsrcPath.toUri().getPath()).isDirectory()) { + throw new IllegalArgumentException("File to copy must not be a directory: " + + localRsrcPath); + } // copy resource to HDFS - String suffix = ".flink/" + appId + "/" + localRsrcPath.getName(); + String suffix = ".flink/" + appId + "/" + relativeTargetPath + "/" + localRsrcPath.getName(); --- End diff -- Yes, but apparently this is filtered out by `new Path(homedir, suffix);` and the right path is created. I did stumble upon this as well and double-checked the results. The alternative is, for example, using `null` if no relative path is desired and setting the `suffix` appropriately which is a bit more ugly but may also be cleaner/safer... Would you rather go this way? ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149076237 --- Diff: flink-yarn/pom.xml --- @@ -99,6 +99,13 @@ under the License. + org.apache.flink + flink-s3-fs-hadoop + ${project.version} + test + --- End diff -- We could... I wasn't putting this test there because in the project's pom.xml it says `There is a separate "flink-yarn-tests" package that expects the "flink-dist" package to be build before.` which we do not need for this test. whatever you prefer... ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149070982 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration conf, Mapap } /** +* Copy a local file to a remote file system. +* +* @param fs +* remote filesystem +* @param appId +* application ID +* @param localRsrcPath +* path to the local file +* @param homedir +* remote home directory base (will be extended) +* @param relativeTargetPath +* relative target path of the file (will be prefixed be the full home directory we set up) +* * @return Path to remote file (usually hdfs) -* @throws IOException */ - public static Path setupLocalResource( - FileSystem fs, - String appId, Path localRsrcPath, - LocalResource appMasterJar, - Path homedir) throws IOException { + static Tuple2 setupLocalResource( + FileSystem fs, + String appId, + Path localRsrcPath, + Path homedir, + String relativeTargetPath) throws IOException { + + if (new File(localRsrcPath.toUri().getPath()).isDirectory()) { + throw new IllegalArgumentException("File to copy must not be a directory: " + + localRsrcPath); + } // copy resource to HDFS - String suffix = ".flink/" + appId + "/" + localRsrcPath.getName(); + String suffix = ".flink/" + appId + "/" + relativeTargetPath + "/" + localRsrcPath.getName(); Path dst = new Path(homedir, suffix); LOG.info("Copying from " + localRsrcPath + " to " + dst); - fs.copyFromLocalFile(localRsrcPath, dst); - registerLocalResource(fs, dst, appMasterJar); - return dst; + + fs.copyFromLocalFile(false, true, localRsrcPath, dst); + + // now create the resource instance + LocalResource resource = Records.newRecord(LocalResource.class); + registerLocalResource(fs, dst, resource); + return Tuple2.of(dst, resource); } - public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException { + private static void registerLocalResource( + FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException { --- End diff -- Either put every parameter on a different line or please revert this change. ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149069663 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -981,25 +998,54 @@ public ApplicationReport startAppMaster( return report; } - private static List uploadAndRegisterFiles( - Collection shipFiles, + /** +* Uploads and registers a single resource and adds it to localResources. +* +* @param key +* the key to add the resource under +* @param fs +* the remote file system to upload to +* @param appId +* application ID +* @param localRsrcPath +* local path to the file +* @param localResources +* map of resources +* +* @return the remote path to the uploaded resource +*/ + private static Path setupSingleLocalResource( + String key, FileSystem fs, - String appId, - List remotePaths, + ApplicationId appId, + Path localRsrcPath, --- End diff -- Typo `localSrcPath`? ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149069107 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -705,11 +707,12 @@ public ApplicationReport startAppMaster( StringBuilder envShipFileList = new StringBuilder(); // upload and register ship files - List systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, localResources, envShipFileList); + List systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, + homeDir, appId, paths, localResources, envShipFileList); --- End diff -- nit: I would either not break lines or break after each argument. ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149071810 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.util.OperatingSystem; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Tests for verifying file staging during submission to YARN works. + */ +public class YarnFileStageTest { + + @ClassRule + public static final TemporaryFolder CLASS_TEMP_DIR = new TemporaryFolder(); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private static MiniDFSCluster hdfsCluster; + + private static Path hdfsRootPath; + + private org.apache.hadoop.conf.Configuration hadoopConfig; + + // + // Test setup and shutdown + // + + @BeforeClass + public static void createHDFS() throws Exception { + Assume.assumeTrue(!OperatingSystem.isWindows()); + + final File tempDir = CLASS_TEMP_DIR.newFolder(); + + org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath()); + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + hdfsCluster = builder.build(); + hdfsRootPath = new Path(hdfsCluster.getURI()); + } + + @AfterClass + public static void destroyHDFS() { + if (hdfsCluster != null) { + hdfsCluster.shutdown(); + } + hdfsCluster = null; + hdfsRootPath = null; + } + + @Before + public void initConfig() { + hadoopConfig = new org.apache.hadoop.conf.Configuration(); + hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, hdfsRootPath.toString()); + } + + /** +* Verifies that nested directories are properly copied with a hdfs:// file +* system (from a file:///absolute/path source path). +*/ + @Test + public void testCopyFromLocalRecursiveWithScheme() throws Exception { + final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig); + final Path targetDir = targetFileSystem.getWorkingDirectory(); + + testCopyFromLocalRecursive(targetFileSystem, targetDir, tempFolder, true); + } + + /** +* Verifies that nested directories are properly copied with a hdfs:// file +*
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149070583 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration conf, Mapap } /** +* Copy a local file to a remote file system. +* +* @param fs +* remote filesystem +* @param appId +* application ID +* @param localRsrcPath +* path to the local file +* @param homedir +* remote home directory base (will be extended) +* @param relativeTargetPath +* relative target path of the file (will be prefixed be the full home directory we set up) +* * @return Path to remote file (usually hdfs) -* @throws IOException */ - public static Path setupLocalResource( - FileSystem fs, - String appId, Path localRsrcPath, - LocalResource appMasterJar, - Path homedir) throws IOException { + static Tuple2 setupLocalResource( + FileSystem fs, + String appId, + Path localRsrcPath, --- End diff -- `localSrcPath`? ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149073723 --- Diff: flink-yarn/pom.xml --- @@ -99,6 +99,13 @@ under the License. + org.apache.flink + flink-s3-fs-hadoop + ${project.version} + test + --- End diff -- We could move the S3 upload test to `flink-yarn-tests`. That way we would only add this dependency to `flink-yarn-tests`. ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149071491 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration conf, Mapap } /** +* Copy a local file to a remote file system. +* +* @param fs +* remote filesystem +* @param appId +* application ID +* @param localRsrcPath +* path to the local file +* @param homedir +* remote home directory base (will be extended) +* @param relativeTargetPath +* relative target path of the file (will be prefixed be the full home directory we set up) +* * @return Path to remote file (usually hdfs) -* @throws IOException */ - public static Path setupLocalResource( - FileSystem fs, - String appId, Path localRsrcPath, - LocalResource appMasterJar, - Path homedir) throws IOException { + static Tuple2 setupLocalResource( + FileSystem fs, + String appId, + Path localRsrcPath, + Path homedir, + String relativeTargetPath) throws IOException { + + if (new File(localRsrcPath.toUri().getPath()).isDirectory()) { + throw new IllegalArgumentException("File to copy must not be a directory: " + + localRsrcPath); + } // copy resource to HDFS - String suffix = ".flink/" + appId + "/" + localRsrcPath.getName(); + String suffix = ".flink/" + appId + "/" + relativeTargetPath + "/" + localRsrcPath.getName(); Path dst = new Path(homedir, suffix); LOG.info("Copying from " + localRsrcPath + " to " + dst); - fs.copyFromLocalFile(localRsrcPath, dst); - registerLocalResource(fs, dst, appMasterJar); - return dst; + + fs.copyFromLocalFile(false, true, localRsrcPath, dst); + + // now create the resource instance + LocalResource resource = Records.newRecord(LocalResource.class); + registerLocalResource(fs, dst, resource); + return Tuple2.of(dst, resource); } - public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException { + private static void registerLocalResource( --- End diff -- Touching this code, could we change it that we create and return a `LocalResource` in this method? ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149070829 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration conf, Mapap } /** +* Copy a local file to a remote file system. +* +* @param fs +* remote filesystem +* @param appId +* application ID +* @param localRsrcPath +* path to the local file +* @param homedir +* remote home directory base (will be extended) +* @param relativeTargetPath +* relative target path of the file (will be prefixed be the full home directory we set up) +* * @return Path to remote file (usually hdfs) -* @throws IOException */ - public static Path setupLocalResource( - FileSystem fs, - String appId, Path localRsrcPath, - LocalResource appMasterJar, - Path homedir) throws IOException { + static Tuple2 setupLocalResource( + FileSystem fs, + String appId, + Path localRsrcPath, + Path homedir, + String relativeTargetPath) throws IOException { + + if (new File(localRsrcPath.toUri().getPath()).isDirectory()) { + throw new IllegalArgumentException("File to copy must not be a directory: " + + localRsrcPath); + } // copy resource to HDFS - String suffix = ".flink/" + appId + "/" + localRsrcPath.getName(); + String suffix = ".flink/" + appId + "/" + relativeTargetPath + "/" + localRsrcPath.getName(); --- End diff -- What if `relativeTargetPath` is `""`. Wouldn't that lead to `appId//localSrcPath.getName()`? ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r149071109 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -408,10 +437,12 @@ static ContainerLaunchContext createTaskExecutorContext( // prepare additional files to be shipped for (String pathStr : shipListString.split(",")) { if (!pathStr.isEmpty()) { + String[] pathWithKey = pathStr.split("="); --- End diff -- Isn't this rather `keyWithPath`? ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r148756933 --- Diff: flink-filesystems/flink-s3-fs-hadoop/pom.xml --- @@ -182,6 +182,21 @@ under the License. ${project.version} test + --- End diff -- Yes this is about the recursive upload which needs to be tested once with hdfs and once more with s3. Sure we could flip the dependency and let the tests in the `yarn` sub-project depend on `flink-s3-fs-hadoop` (and I don't mind which depends on which, actually) but wouldn't this be just the same but in reverse? ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r148756295 --- Diff: flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java --- @@ -57,11 +62,52 @@ private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY"); private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY"); + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + @BeforeClass - public static void checkIfCredentialsArePresent() { + public static void checkCredentialsAndSetup() throws IOException { + // check whether credentials exist Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null); Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null); Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null); + + // initialize configuration with valid credentials --- End diff -- This is actually not for the new test, but for the cleanup: the current state of the `HadoopS3FileSystemITCase` leaves this (random) test directory behind. In order to delete this after the tests of the class finished, I thought we should make sure that it did not exist before so that we are not deleting something we shouldn't! ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r148656504 --- Diff: flink-filesystems/flink-s3-fs-hadoop/pom.xml --- @@ -182,6 +182,21 @@ under the License. ${project.version} test + --- End diff -- Would be great if we can avoid adding these dependencies. This couples projects that were really meant to be independent, even if just in test scope. If this is about testing recursive upload, can this be written properly as a test case in this project? Or can the Yarn upload test be completely in the yarn test project, adding a dependency on this s3 project? ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r148656918 --- Diff: flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java --- @@ -57,11 +62,52 @@ private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY"); private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY"); + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + @BeforeClass - public static void checkIfCredentialsArePresent() { + public static void checkCredentialsAndSetup() throws IOException { + // check whether credentials exist Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null); Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null); Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null); + + // initialize configuration with valid credentials --- End diff -- I would suggest to move this out of the "setup" method into the actual test. The setup logic is not shared (all other test methods don't assume that setup) and it also assumes existence of functionality that is tested in other test methods.. ---
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4939 [FLINK-4228][yarn/s3a] fix yarn resource upload s3a defaultFs ## What is the purpose of the change If YARN is configured to use the `s3a` default file system, upload of the Flink jars will fail since its `org.apache.hadoop.fs.FileSystem#copyFromLocalFile()` does not work recursively on the given `lib` folder. ## Brief change log - implement our own recursive upload (based on #2288) - add unit tests to verify its behaviour for both `hdfs://` and `s3://` (via S3A) resource uploads ## Verifying this change This change added tests and can be verified as follows: - added a unit test for HDFS uploads via our `MiniDFSCluster` - added integration test to verify S3 uploads (via the S3A filesystem implementation of the `flink-s3-fs-hadoop` sub-project) - manually verified the test on YARN with both S3A and HDFS default file systems being set ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes - internally) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-4228 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4939.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4939 commit 5d31f41e0e480820e9fec1efa84e5725364a136d Author: Nico KruberDate: 2017-11-02T18:38:48Z [hotfix][s3] fix HadoopS3FileSystemITCase leaving test directories behind in S3 commit bf47d376397a8e64625a031468d5f5d0a5486238 Author: Nico Kruber Date: 2016-11-09T20:04:50Z [FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs + includes a new unit tests for recursive uploads to hfds:// targets + add a unit test for recursive file uploads to s3:// via s3a ---