[FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs

+ includes a new unit tests for recursive uploads to hfds:// targets
+ add a unit test for recursive file uploads to s3:// via s3a

[FLINK-4228][yarn/s3] turn the dependencies around

Instead of having flink-s3-fs-hadoop depend on flink-yarn_<scala_version>,
let flink-yarn depend on the s3 filesystem and implement the test there.
This is safer with regards to the scala-independent flink-s3-fs-hadoop project.

[FLINK-4228][yarn] change the S3 upload tests to use Hadoop's S3 implementations

This is how YARN would use it and what should really be tested.

[FLINK-4228][yarn] enable S3 tests for newer Hadoop versions

- requires the 'include_hadoop_aws' build profile (or property) to be set
- requires a newer aws-sdk version (than Hadoop pulls in) to work with our
  httpcomponents version
- we also add a check that at least one S3 implementation is tested to not
silently ignore all tests because of such a missing dependency

This closes #4939.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36b80756
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36b80756
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36b80756

Branch: refs/heads/release-1.4
Commit: 36b807567d93c431c1498241a42c20221cb6a664
Parents: 666b1b2
Author: Nico Kruber <n...@data-artisans.com>
Authored: Wed Nov 9 15:04:50 2016 -0500
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sat Nov 18 10:58:53 2017 +0100

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 flink-yarn/pom.xml                              |  63 ++++++
 .../yarn/AbstractYarnClusterDescriptor.java     | 192 +++++++++++-----
 .../main/java/org/apache/flink/yarn/Utils.java  |  86 +++++---
 .../apache/flink/yarn/YarnFileStageTest.java    | 218 ++++++++++++++++++
 .../flink/yarn/YarnFileStageTestS3ITCase.java   | 220 +++++++++++++++++++
 6 files changed, 698 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/36b80756/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index daf2186..5e2ef74 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -59,7 +59,7 @@ matrix:
     - jdk: "oraclejdk8"
       env:
         - TEST="misc"
-        - PROFILE="-Dhadoop.version=2.8.0"
+        - PROFILE="-Dhadoop.version=2.8.0 -Dinclude_hadoop_aws"
         - CACHE_NAME=JDK8_H280_M
     - jdk: "openjdk8"
       env:

http://git-wip-us.apache.org/repos/asf/flink/blob/36b80756/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index db6ee88..f621acf 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -31,6 +31,12 @@ under the License.
        <name>flink-yarn</name>
        <packaging>jar</packaging>
 
+       <properties>
+               <!-- for testing (will override Hadoop's default dependency on 
too low SDK versions that
+                       do not work with our httpcomponents version) -->
+               <aws.sdk.version>1.11.171</aws.sdk.version>
+       </properties>
+
        <dependencies>
 
                <!-- core dependencies -->
@@ -153,6 +159,63 @@ under the License.
                                </plugins>
                        </build>
                </profile>
+
+               <profile>
+                       <!-- Hadoop >= 2.6 moved the S3 file systems from 
hadoop-common into hadoop-aws artifact
+                               (see 
https://issues.apache.org/jira/browse/HADOOP-11074)
+                               We can add the (test) dependency per default 
once 2.6 is the minimum required version.
+                       -->
+                       <id>include_hadoop_aws</id>
+                       <activation>
+                               <property>
+                                       <name>include_hadoop_aws</name>
+                               </property>
+                       </activation>
+                       <dependencies>
+                               <!-- for the S3 tests of 
YarnFileStageTestS3ITCase -->
+                               <dependency>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-aws</artifactId>
+                                       <version>${hadoop.version}</version>
+                                       <scope>test</scope>
+                                       <exclusions>
+                                               <exclusion>
+                                                       
<groupId>org.apache.avro</groupId>
+                                                       
<artifactId>avro</artifactId>
+                                               </exclusion>
+                                               <!-- The aws-java-sdk-core 
requires jackson 2.6, but
+                                                       hadoop pulls in 2.3 -->
+                                               <exclusion>
+                                                       
<groupId>com.fasterxml.jackson.core</groupId>
+                                                       
<artifactId>jackson-annotations</artifactId>
+                                               </exclusion>
+                                               <exclusion>
+                                                       
<groupId>com.fasterxml.jackson.core</groupId>
+                                                       
<artifactId>jackson-core</artifactId>
+                                               </exclusion>
+                                               <exclusion>
+                                                       
<groupId>com.fasterxml.jackson.core</groupId>
+                                                       
<artifactId>jackson-databind</artifactId>
+                                               </exclusion>
+                                       </exclusions>
+                               </dependency>
+                               <!-- override Hadoop's default dependency on 
too low SDK versions that do not work
+                                       with our httpcomponents version when 
initialising the s3a file system -->
+                               <dependency>
+                                       <groupId>com.amazonaws</groupId>
+                                       <artifactId>aws-java-sdk-s3</artifactId>
+                                       <version>${aws.sdk.version}</version>
+                                       <scope>test</scope>
+                               </dependency>
+                               <dependency>
+                                       <groupId>com.amazonaws</groupId>
+                                       
<artifactId>aws-java-sdk-sts</artifactId>
+                                       <version>${aws.sdk.version}</version>
+                                       <scope>test</scope>
+                               </dependency>
+                       </dependencies>
+               </profile>
+
        </profiles>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/36b80756/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 8ecc371..5ac5c4e 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.ConfigConstants;
@@ -624,6 +625,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                // Copy the application master jar to the filesystem
                // Create a local resource to point to the destination jar path
                final FileSystem fs = FileSystem.get(conf);
+               final Path homeDir = fs.getHomeDirectory();
 
                // hard coded check for the GoogleHDFS client because its not 
overriding the getScheme() method.
                if 
(!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
@@ -705,11 +707,25 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                StringBuilder envShipFileList = new StringBuilder();
 
                // upload and register ship files
-               List<String> systemClassPaths = 
uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, 
localResources, envShipFileList);
+               List<String> systemClassPaths = uploadAndRegisterFiles(
+                       systemShipFiles,
+                       fs,
+                       homeDir,
+                       appId,
+                       paths,
+                       localResources,
+                       envShipFileList);
 
                List<String> userClassPaths;
                if (userJarInclusion != 
YarnConfigOptions.UserJarInclusion.DISABLED) {
-                       userClassPaths = uploadAndRegisterFiles(userJarFiles, 
fs, appId.toString(), paths, localResources, envShipFileList);
+                       userClassPaths = uploadAndRegisterFiles(
+                               userJarFiles,
+                               fs,
+                               homeDir,
+                               appId,
+                               paths,
+                               localResources,
+                               envShipFileList);
                } else {
                        userClassPaths = Collections.emptyList();
                }
@@ -739,32 +755,29 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                }
 
                // Setup jar for ApplicationMaster
-               LocalResource appMasterJar = 
Records.newRecord(LocalResource.class);
-               Path remotePathJar = Utils.setupLocalResource(
+               Path remotePathJar = setupSingleLocalResource(
+                       "flink.jar",
                        fs,
-                       appId.toString(),
+                       appId,
                        flinkJarPath,
-                       appMasterJar,
-                       fs.getHomeDirectory());
-
-               localResources.put("flink.jar", appMasterJar);
+                       localResources,
+                       homeDir,
+                       "");
 
                // Upload the flink configuration
-               LocalResource flinkConf = 
Records.newRecord(LocalResource.class);
-
                // write out configuration file
                File tmpConfigurationFile = File.createTempFile(appId + 
"-flink-conf.yaml", null);
                tmpConfigurationFile.deleteOnExit();
                BootstrapTools.writeConfiguration(flinkConfiguration, 
tmpConfigurationFile);
 
-               Path remotePathConf = Utils.setupLocalResource(
+               Path remotePathConf = setupSingleLocalResource(
+                       "flink-conf.yaml",
                        fs,
-                       appId.toString(),
+                       appId,
                        new Path(tmpConfigurationFile.getAbsolutePath()),
-                       flinkConf,
-                       fs.getHomeDirectory());
-
-               localResources.put("flink-conf.yaml", flinkConf);
+                       localResources,
+                       homeDir,
+                       "");
 
                paths.add(remotePathJar);
                classPathBuilder.append("flink.jar").append(File.pathSeparator);
@@ -781,11 +794,16 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                                        ObjectOutputStream obOutput = new 
ObjectOutputStream(output);){
                                        obOutput.writeObject(jobGraph);
                                }
-                               LocalResource jobgraph = 
Records.newRecord(LocalResource.class);
-                               Path remoteJobGraph =
-                                               Utils.setupLocalResource(fs, 
appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory());
-                               localResources.put("job.graph", jobgraph);
-                               paths.add(remoteJobGraph);
+
+                               Path pathFromYarnURL = setupSingleLocalResource(
+                                       "job.graph",
+                                       fs,
+                                       appId,
+                                       new Path(fp.toURI()),
+                                       localResources,
+                                       homeDir,
+                                       "");
+                               paths.add(pathFromYarnURL);
                                
classPathBuilder.append("job.graph").append(File.pathSeparator);
                        } catch (Exception e) {
                                LOG.warn("Add job graph to local resource 
fail");
@@ -793,7 +811,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        }
                }
 
-               Path yarnFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + 
appId + '/');
+               Path yarnFilesDir = new Path(homeDir, ".flink/" + appId + '/');
 
                FsPermission permission = new FsPermission(FsAction.ALL, 
FsAction.NONE, FsAction.NONE);
                fs.setPermission(yarnFilesDir, permission); // set permission 
for path.
@@ -810,32 +828,44 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        if (krb5Config != null && krb5Config.length() != 0) {
                                File krb5 = new File(krb5Config);
                                LOG.info("Adding KRB5 configuration {} to the 
AM container local resource bucket", krb5.getAbsolutePath());
-                               LocalResource krb5ConfResource = 
Records.newRecord(LocalResource.class);
                                Path krb5ConfPath = new 
Path(krb5.getAbsolutePath());
-                               remoteKrb5Path = Utils.setupLocalResource(fs, 
appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory());
-                               localResources.put(Utils.KRB5_FILE_NAME, 
krb5ConfResource);
+                               remoteKrb5Path = setupSingleLocalResource(
+                                       Utils.KRB5_FILE_NAME,
+                                       fs,
+                                       appId,
+                                       krb5ConfPath,
+                                       localResources,
+                                       homeDir,
+                                       "");
 
                                File f = new 
File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
                                LOG.info("Adding Yarn configuration {} to the 
AM container local resource bucket", f.getAbsolutePath());
-                               LocalResource yarnConfResource = 
Records.newRecord(LocalResource.class);
                                Path yarnSitePath = new 
Path(f.getAbsolutePath());
-                               remoteYarnSiteXmlPath = 
Utils.setupLocalResource(fs, appId.toString(), yarnSitePath, yarnConfResource, 
fs.getHomeDirectory());
-                               localResources.put(Utils.YARN_SITE_FILE_NAME, 
yarnConfResource);
-
+                               remoteYarnSiteXmlPath = 
setupSingleLocalResource(
+                                       Utils.YARN_SITE_FILE_NAME,
+                                       fs,
+                                       appId,
+                                       yarnSitePath,
+                                       localResources,
+                                       homeDir,
+                                       "");
                                hasKrb5 = true;
                        }
                }
 
                // setup security tokens
-               LocalResource keytabResource = null;
                Path remotePathKeytab = null;
                String keytab = 
flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
                if (keytab != null) {
                        LOG.info("Adding keytab {} to the AM container local 
resource bucket", keytab);
-                       keytabResource = Records.newRecord(LocalResource.class);
-                       Path keytabPath = new Path(keytab);
-                       remotePathKeytab = Utils.setupLocalResource(fs, 
appId.toString(), keytabPath, keytabResource, fs.getHomeDirectory());
-                       localResources.put(Utils.KEYTAB_FILE_NAME, 
keytabResource);
+                       remotePathKeytab = setupSingleLocalResource(
+                               Utils.KEYTAB_FILE_NAME,
+                               fs,
+                               appId,
+                               new Path(keytab),
+                               localResources,
+                               homeDir,
+                               "");
                }
 
                final ContainerLaunchContext amContainer = 
setupApplicationMasterContainer(
@@ -866,7 +896,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, 
String.valueOf(clusterSpecification.getTaskManagerMemoryMB()));
                appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, 
remotePathJar.toString());
                appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
-               appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, 
fs.getHomeDirectory().toString());
+               appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, 
homeDir.toString());
                appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, 
envShipFileList.toString());
                appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, 
String.valueOf(clusterSpecification.getSlotsPerTaskManager()));
                appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, 
String.valueOf(detached));
@@ -876,7 +906,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                // 
https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
                appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, 
UserGroupInformation.getCurrentUser().getUserName());
 
-               if (keytabResource != null) {
+               if (remotePathKeytab != null) {
                        appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, 
remotePathKeytab.toString());
                        String principal = 
flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
                        appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, 
principal);
@@ -981,25 +1011,54 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                return report;
        }
 
-       private static List<String> uploadAndRegisterFiles(
-                       Collection<File> shipFiles,
+       /**
+        * Uploads and registers a single resource and adds it to 
<tt>localResources</tt>.
+        *
+        * @param key
+        *              the key to add the resource under
+        * @param fs
+        *              the remote file system to upload to
+        * @param appId
+        *              application ID
+        * @param localSrcPath
+        *              local path to the file
+        * @param localResources
+        *              map of resources
+        *
+        * @return the remote path to the uploaded resource
+        */
+       private static Path setupSingleLocalResource(
+                       String key,
                        FileSystem fs,
-                       String appId,
-                       List<Path> remotePaths,
+                       ApplicationId appId,
+                       Path localSrcPath,
                        Map<String, LocalResource> localResources,
-                       StringBuilder envShipFileList) throws IOException {
-               final List<String> classPaths = new ArrayList<>(2 + 
shipFiles.size());
-               for (File shipFile : shipFiles) {
-                       LocalResource shipResources = 
Records.newRecord(LocalResource.class);
+                       Path targetHomeDir,
+                       String relativeTargetPath) throws IOException, 
URISyntaxException {
 
-                       Path shipLocalPath = new Path("file://" + 
shipFile.getAbsolutePath());
-                       Path remotePath =
-                               Utils.setupLocalResource(fs, appId, 
shipLocalPath, shipResources, fs.getHomeDirectory());
+               Tuple2<Path, LocalResource> resource = Utils.setupLocalResource(
+                       fs,
+                       appId.toString(),
+                       localSrcPath,
+                       targetHomeDir,
+                       relativeTargetPath);
 
-                       remotePaths.add(remotePath);
+               localResources.put(key, resource.f1);
 
-                       localResources.put(shipFile.getName(), shipResources);
+               return resource.f0;
+       }
+
+       static List<String> uploadAndRegisterFiles(
+                       Collection<File> shipFiles,
+                       FileSystem fs,
+                       Path targetHomeDir,
+                       ApplicationId appId,
+                       List<Path> remotePaths,
+                       Map<String, LocalResource> localResources,
+                       StringBuilder envShipFileList) throws IOException, 
URISyntaxException {
 
+               final List<String> classPaths = new ArrayList<>(2 + 
shipFiles.size());
+               for (File shipFile : shipFiles) {
                        if (shipFile.isDirectory()) {
                                // add directories to the classpath
                                java.nio.file.Path shipPath = shipFile.toPath();
@@ -1011,17 +1070,40 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                                                throws IOException {
                                                java.nio.file.Path relativePath 
= parentPath.relativize(file);
 
-                                               
classPaths.add(relativePath.toString());
-
-                                               return FileVisitResult.CONTINUE;
+                                               String key = 
relativePath.toString();
+                                               try {
+                                                       Path remotePath = 
setupSingleLocalResource(
+                                                               key,
+                                                               fs,
+                                                               appId,
+                                                               new 
Path(file.toUri()),
+                                                               localResources,
+                                                               targetHomeDir,
+                                                               
relativePath.getParent().toString());
+                                                       
remotePaths.add(remotePath);
+                                                       
envShipFileList.append(key).append("=").append(remotePath).append(",");
+
+                                                       // add files to the 
classpath
+                                                       classPaths.add(key);
+
+                                                       return 
FileVisitResult.CONTINUE;
+                                               } catch (URISyntaxException e) {
+                                                       throw new 
IOException(e);
+                                               }
                                        }
                                });
                        } else {
+                               Path shipLocalPath = new Path("file://" + 
shipFile.getAbsolutePath());
+                               String key = shipFile.getName();
+                               Path remotePath = setupSingleLocalResource(
+                                       key, fs, appId, shipLocalPath, 
localResources, targetHomeDir, "");
+                               remotePaths.add(remotePath);
+                               
envShipFileList.append(key).append("=").append(remotePath).append(",");
+
                                // add files to the classpath
-                               classPaths.add(shipFile.getName());
+                               classPaths.add(key);
                        }
 
-                       envShipFileList.append(remotePath).append(",");
                }
                return classPaths;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/36b80756/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 32cbb64..652afec 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -117,33 +118,60 @@ public final class Utils {
        }
 
        /**
+        * Copy a local file to a remote file system.
+        *
+        * @param fs
+        *              remote filesystem
+        * @param appId
+        *              application ID
+        * @param localSrcPath
+        *              path to the local file
+        * @param homedir
+        *              remote home directory base (will be extended)
+        * @param relativeTargetPath
+        *              relative target path of the file (will be prefixed be 
the full home directory we set up)
+        *
         * @return Path to remote file (usually hdfs)
-        * @throws IOException
         */
-       public static Path setupLocalResource(
-                       FileSystem fs,
-                       String appId, Path localRsrcPath,
-                       LocalResource appMasterJar,
-                       Path homedir) throws IOException {
+       static Tuple2<Path, LocalResource> setupLocalResource(
+               FileSystem fs,
+               String appId,
+               Path localSrcPath,
+               Path homedir,
+               String relativeTargetPath) throws IOException {
+
+               if (new File(localSrcPath.toUri().getPath()).isDirectory()) {
+                       throw new IllegalArgumentException("File to copy must 
not be a directory: " +
+                               localSrcPath);
+               }
 
                // copy resource to HDFS
-               String suffix = ".flink/" + appId + "/" + 
localRsrcPath.getName();
+               String suffix =
+                       ".flink/"
+                               + appId
+                               + (relativeTargetPath.isEmpty() ? "" : "/" + 
relativeTargetPath)
+                               + "/" + localSrcPath.getName();
 
                Path dst = new Path(homedir, suffix);
 
-               LOG.info("Copying from " + localRsrcPath + " to " + dst);
-               fs.copyFromLocalFile(localRsrcPath, dst);
-               registerLocalResource(fs, dst, appMasterJar);
-               return dst;
+               LOG.info("Copying from " + localSrcPath + " to " + dst);
+
+               fs.copyFromLocalFile(false, true, localSrcPath, dst);
+
+               // now create the resource instance
+               LocalResource resource = registerLocalResource(fs, dst);
+               return Tuple2.of(dst, resource);
        }
 
-       public static void registerLocalResource(FileSystem fs, Path 
remoteRsrcPath, LocalResource localResource) throws IOException {
+       private static LocalResource registerLocalResource(FileSystem fs, Path 
remoteRsrcPath) throws IOException {
+               LocalResource localResource = 
Records.newRecord(LocalResource.class);
                FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
                
localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
                localResource.setSize(jarStat.getLen());
                localResource.setTimestamp(jarStat.getModificationTime());
                localResource.setType(LocalResourceType.FILE);
                
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+               return localResource;
        }
 
        public static void setTokensFor(ContainerLaunchContext amContainer, 
List<Path> paths, Configuration conf) throws IOException {
@@ -340,10 +368,9 @@ public final class Utils {
                LocalResource keytabResource = null;
                if (remoteKeytabPath != null) {
                        log.info("Adding keytab {} to the AM container local 
resource bucket", remoteKeytabPath);
-                       keytabResource = Records.newRecord(LocalResource.class);
                        Path keytabPath = new Path(remoteKeytabPath);
                        FileSystem fs = keytabPath.getFileSystem(yarnConfig);
-                       registerLocalResource(fs, keytabPath, keytabResource);
+                       keytabResource = registerLocalResource(fs, keytabPath);
                }
 
                //To support Yarn Secure Integration Test Scenario
@@ -352,30 +379,28 @@ public final class Utils {
                boolean hasKrb5 = false;
                if (remoteYarnConfPath != null && remoteKrb5Path != null) {
                        log.info("TM:Adding remoteYarnConfPath {} to the 
container local resource bucket", remoteYarnConfPath);
-                       yarnConfResource = 
Records.newRecord(LocalResource.class);
                        Path yarnConfPath = new Path(remoteYarnConfPath);
                        FileSystem fs = yarnConfPath.getFileSystem(yarnConfig);
-                       registerLocalResource(fs, yarnConfPath, 
yarnConfResource);
+                       yarnConfResource = registerLocalResource(fs, 
yarnConfPath);
 
                        log.info("TM:Adding remoteKrb5Path {} to the container 
local resource bucket", remoteKrb5Path);
-                       krb5ConfResource = 
Records.newRecord(LocalResource.class);
                        Path krb5ConfPath = new Path(remoteKrb5Path);
                        fs = krb5ConfPath.getFileSystem(yarnConfig);
-                       registerLocalResource(fs, krb5ConfPath, 
krb5ConfResource);
+                       krb5ConfResource = registerLocalResource(fs, 
krb5ConfPath);
 
                        hasKrb5 = true;
                }
 
                // register Flink Jar with remote HDFS
-               LocalResource flinkJar = Records.newRecord(LocalResource.class);
+               final LocalResource flinkJar;
                {
                        Path remoteJarPath = new Path(remoteFlinkJarPath);
                        FileSystem fs = remoteJarPath.getFileSystem(yarnConfig);
-                       registerLocalResource(fs, remoteJarPath, flinkJar);
+                       flinkJar = registerLocalResource(fs, remoteJarPath);
                }
 
                // register conf with local fs
-               LocalResource flinkConf = 
Records.newRecord(LocalResource.class);
+               final LocalResource flinkConf;
                {
                        // write the TaskManager configuration to a local file
                        final File taskManagerConfigFile =
@@ -385,8 +410,13 @@ public final class Utils {
 
                        Path homeDirPath = new Path(clientHomeDir);
                        FileSystem fs = homeDirPath.getFileSystem(yarnConfig);
-                       setupLocalResource(fs, appId,
-                                       new 
Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
+
+                       flinkConf = setupLocalResource(
+                               fs,
+                               appId,
+                               new Path(taskManagerConfigFile.toURI()),
+                               homeDirPath,
+                               "").f1;
 
                        log.info("Prepared local resource for modified yaml: 
{}", flinkConf);
                }
@@ -408,10 +438,11 @@ public final class Utils {
                // prepare additional files to be shipped
                for (String pathStr : shipListString.split(",")) {
                        if (!pathStr.isEmpty()) {
-                               LocalResource resource = 
Records.newRecord(LocalResource.class);
-                               Path path = new Path(pathStr);
-                               
registerLocalResource(path.getFileSystem(yarnConfig), path, resource);
-                               taskManagerLocalResources.put(path.getName(), 
resource);
+                               String[] keyAndPath = pathStr.split("=");
+                               require(keyAndPath.length == 2, "Invalid entry 
in ship file list: %s", pathStr);
+                               Path path = new Path(keyAndPath[1]);
+                               LocalResource resource = 
registerLocalResource(path.getFileSystem(yarnConfig), path);
+                               taskManagerLocalResources.put(keyAndPath[0], 
resource);
                        }
                }
 
@@ -488,4 +519,5 @@ public final class Utils {
                        throw new RuntimeException(String.format(message, 
values));
                }
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/36b80756/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
new file mode 100644
index 0000000..4d38253
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Tests for verifying file staging during submission to YARN works.
+ */
+public class YarnFileStageTest extends TestLogger {
+
+       @ClassRule
+       public static final TemporaryFolder CLASS_TEMP_DIR = new 
TemporaryFolder();
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       private static MiniDFSCluster hdfsCluster;
+
+       private static Path hdfsRootPath;
+
+       private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+       // 
------------------------------------------------------------------------
+       //  Test setup and shutdown
+       // 
------------------------------------------------------------------------
+
+       @BeforeClass
+       public static void createHDFS() throws Exception {
+               Assume.assumeTrue(!OperatingSystem.isWindows());
+
+               final File tempDir = CLASS_TEMP_DIR.newFolder();
+
+               org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+               hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
tempDir.getAbsolutePath());
+
+               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+               hdfsCluster = builder.build();
+               hdfsRootPath = new Path(hdfsCluster.getURI());
+       }
+
+       @AfterClass
+       public static void destroyHDFS() {
+               if (hdfsCluster != null) {
+                       hdfsCluster.shutdown();
+               }
+               hdfsCluster = null;
+               hdfsRootPath = null;
+       }
+
+       @Before
+       public void initConfig() {
+               hadoopConfig = new org.apache.hadoop.conf.Configuration();
+               
hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, 
hdfsRootPath.toString());
+       }
+
+       /**
+        * Verifies that nested directories are properly copied with a 
<tt>hdfs://</tt> file
+        * system (from a <tt>file:///absolute/path</tt> source path).
+        */
+       @Test
+       public void testCopyFromLocalRecursiveWithScheme() throws Exception {
+               final FileSystem targetFileSystem = 
hdfsRootPath.getFileSystem(hadoopConfig);
+               final Path targetDir = targetFileSystem.getWorkingDirectory();
+
+               testCopyFromLocalRecursive(targetFileSystem, targetDir, 
tempFolder, true);
+       }
+
+       /**
+        * Verifies that nested directories are properly copied with a 
<tt>hdfs://</tt> file
+        * system (from a <tt>/absolute/path</tt> source path).
+        */
+       @Test
+       public void testCopyFromLocalRecursiveWithoutScheme() throws Exception {
+               final FileSystem targetFileSystem = 
hdfsRootPath.getFileSystem(hadoopConfig);
+               final Path targetDir = targetFileSystem.getWorkingDirectory();
+
+               testCopyFromLocalRecursive(targetFileSystem, targetDir, 
tempFolder, false);
+       }
+
+       /**
+        * Verifies that nested directories are properly copied with the given 
filesystem and paths.
+        *
+        * @param targetFileSystem
+        *              file system of the target path
+        * @param targetDir
+        *              target path (URI like <tt>hdfs://...</tt>)
+        * @param tempFolder
+        *              JUnit temporary folder rule to create the source 
directory with
+        * @param addSchemeToLocalPath
+        *              whether add the <tt>file://</tt> scheme to the local 
path to copy from
+        */
+       public static void testCopyFromLocalRecursive(
+                       FileSystem targetFileSystem,
+                       Path targetDir,
+                       TemporaryFolder tempFolder,
+                       boolean addSchemeToLocalPath) throws Exception {
+
+               // directory must not yet exist
+               assertFalse(targetFileSystem.exists(targetDir));
+
+               final File srcDir = tempFolder.newFolder();
+               final Path srcPath;
+               if (addSchemeToLocalPath) {
+                       srcPath = new Path("file://" + 
srcDir.getAbsolutePath());
+               } else {
+                       srcPath = new Path(srcDir.getAbsolutePath());
+               }
+
+               HashMap<String /* (relative) path */, /* contents */ String> 
srcFiles = new HashMap<>(4);
+
+               // create and fill source files
+               srcFiles.put("1", "Hello 1");
+               srcFiles.put("2", "Hello 2");
+               srcFiles.put("nested/3", "Hello nested/3");
+               srcFiles.put("nested/4/5", "Hello nested/4/5");
+               for (Map.Entry<String, String> src : srcFiles.entrySet()) {
+                       File file = new File(srcDir, src.getKey());
+                       //noinspection ResultOfMethodCallIgnored
+                       file.getParentFile().mkdirs();
+                       try (DataOutputStream out = new DataOutputStream(new 
FileOutputStream(file))) {
+                               out.writeUTF(src.getValue());
+                       }
+               }
+
+               // copy the created directory recursively:
+               try {
+                       List<Path> remotePaths = new ArrayList<>();
+                       HashMap<String, LocalResource> localResources = new 
HashMap<>();
+                       AbstractYarnClusterDescriptor.uploadAndRegisterFiles(
+                               Collections.singletonList(new 
File(srcPath.toUri().getPath())),
+                               targetFileSystem,
+                               targetDir,
+                               ApplicationId.newInstance(0, 0),
+                               remotePaths,
+                               localResources,
+                               new StringBuilder());
+                       assertEquals(srcFiles.size(), localResources.size());
+
+                       Path workDir = ConverterUtils
+                               
.getPathFromYarnURL(localResources.get(srcPath.getName() + "/1").getResource())
+                               .getParent();
+
+                       RemoteIterator<LocatedFileStatus> targetFilesIterator =
+                               targetFileSystem.listFiles(workDir, true);
+                       HashMap<String /* (relative) path */, /* contents */ 
String> targetFiles =
+                               new HashMap<>(4);
+
+                       final int workDirPrefixLength =
+                               workDir.toString().length() + 1; // one more 
for the concluding "/"
+                       while (targetFilesIterator.hasNext()) {
+                               LocatedFileStatus targetFile = 
targetFilesIterator.next();
+
+                               try (FSDataInputStream in = 
targetFileSystem.open(targetFile.getPath())) {
+                                       String absolutePathString = 
targetFile.getPath().toString();
+                                       String relativePath = 
absolutePathString.substring(workDirPrefixLength);
+                                       targetFiles.put(relativePath, 
in.readUTF());
+
+                                       assertEquals("extraneous data in file " 
+ relativePath, -1, in.read());
+                               }
+                       }
+
+                       assertThat(targetFiles, equalTo(srcFiles));
+               } finally {
+                       // clean up
+                       targetFileSystem.delete(targetDir, true);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/36b80756/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
new file mode 100644
index 0000000..74fb596
--- /dev/null
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeNoException;
+
+/**
+ * Tests for verifying file staging during submission to YARN works with the 
S3A file system.
+ *
+ * <p>Note that the setup is similar to 
<tt>org.apache.flink.fs.s3hadoop.HadoopS3FileSystemITCase</tt>.
+ */
+public class YarnFileStageTestS3ITCase extends TestLogger {
+
+       private static final String BUCKET = 
System.getenv("ARTIFACTS_AWS_BUCKET");
+
+       private static final String TEST_DATA_DIR = "tests-" + 
UUID.randomUUID();
+
+       private static final String ACCESS_KEY = 
System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
+       private static final String SECRET_KEY = 
System.getenv("ARTIFACTS_AWS_SECRET_KEY");
+
+       @ClassRule
+       public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+       @Rule
+       public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+       /**
+        * Number of tests executed.
+        */
+       private static int numRecursiveUploadTests = 0;
+
+       /**
+        * Will be updated by {@link #checkCredentialsAndSetup()} if the test 
is not skipped.
+        */
+       private static boolean skipTest = true;
+
+       @BeforeClass
+       public static void checkCredentialsAndSetup() throws IOException {
+               // check whether credentials exist
+               Assume.assumeTrue("AWS S3 bucket not configured, skipping 
test...", BUCKET != null);
+               Assume.assumeTrue("AWS S3 access key not configured, skipping 
test...", ACCESS_KEY != null);
+               Assume.assumeTrue("AWS S3 secret key not configured, skipping 
test...", SECRET_KEY != null);
+
+               skipTest = false;
+
+               setupCustomHadoopConfig();
+       }
+
+       @AfterClass
+       public static void resetFileSystemConfiguration() throws IOException {
+               FileSystem.initialize(new Configuration());
+       }
+
+       @AfterClass
+       public static void checkAtLeastOneTestRun() {
+               if (!skipTest) {
+                       assertThat(
+                               "No S3 filesystem upload test executed. Please 
activate the " +
+                                       "'include_hadoop_aws' build profile or 
set '-Dinclude_hadoop_aws' during build " +
+                                       "(Hadoop >= 2.6 moved S3 filesystems 
out of hadoop-common).",
+                               numRecursiveUploadTests, greaterThan(0));
+               }
+       }
+
+       /**
+        * Create a Hadoop config file containing S3 access credentials.
+        *
+        * <p>Note that we cannot use them as part of the URL since this may 
fail if the credentials
+        * contain a "/" (see <a 
href="https://issues.apache.org/jira/browse/HADOOP-3733";>HADOOP-3733</a>).
+        */
+       private static void setupCustomHadoopConfig() throws IOException {
+               File hadoopConfig = TEMP_FOLDER.newFile();
+               Map<String /* key */, String /* value */> parameters = new 
HashMap<>();
+
+               // set all different S3 fs implementation variants' 
configuration keys
+               parameters.put("fs.s3a.access.key", ACCESS_KEY);
+               parameters.put("fs.s3a.secret.key", SECRET_KEY);
+
+               parameters.put("fs.s3.awsAccessKeyId", ACCESS_KEY);
+               parameters.put("fs.s3.awsSecretAccessKey", SECRET_KEY);
+
+               parameters.put("fs.s3n.awsAccessKeyId", ACCESS_KEY);
+               parameters.put("fs.s3n.awsSecretAccessKey", SECRET_KEY);
+
+               try (PrintStream out = new PrintStream(new 
FileOutputStream(hadoopConfig))) {
+                       out.println("<?xml version=\"1.0\"?>");
+                       out.println("<?xml-stylesheet type=\"text/xsl\" 
href=\"configuration.xsl\"?>");
+                       out.println("<configuration>");
+                       for (Map.Entry<String, String> entry : 
parameters.entrySet()) {
+                               out.println("\t<property>");
+                               out.println("\t\t<name>" + entry.getKey() + 
"</name>");
+                               out.println("\t\t<value>" + entry.getValue() + 
"</value>");
+                               out.println("\t</property>");
+                       }
+                       out.println("</configuration>");
+               }
+
+               final Configuration conf = new Configuration();
+               conf.setString(ConfigConstants.HDFS_SITE_CONFIG, 
hadoopConfig.getAbsolutePath());
+
+               FileSystem.initialize(conf);
+       }
+
+       /**
+        * Verifies that nested directories are properly copied with to the 
given S3 path (using the
+        * appropriate file system) during resource uploads for YARN.
+        *
+        * @param scheme
+        *              file system scheme
+        * @param pathSuffix
+        *              test path suffix which will be the test's target path
+        */
+       private void testRecursiveUploadForYarn(String scheme, String 
pathSuffix) throws Exception {
+               ++numRecursiveUploadTests;
+
+               final Path basePath = new Path(scheme + "://" + BUCKET + '/' + 
TEST_DATA_DIR);
+               final HadoopFileSystem fs = (HadoopFileSystem) 
basePath.getFileSystem();
+
+               assumeFalse(fs.exists(basePath));
+
+               try {
+                       final Path directory = new Path(basePath, pathSuffix);
+
+                       
YarnFileStageTest.testCopyFromLocalRecursive(fs.getHadoopFileSystem(),
+                               new 
org.apache.hadoop.fs.Path(directory.toUri()), tempFolder, true);
+
+                       // now directory must be gone
+                       assertFalse(fs.exists(directory));
+               } finally {
+                       // clean up
+                       fs.delete(basePath, true);
+               }
+       }
+
+       /**
+        * Verifies that nested directories are properly copied with a 
<tt>s3a://</tt> file
+        * systems during resource uploads for YARN.
+        */
+       @Test
+       public void testRecursiveUploadForYarnS3() throws Exception {
+               try {
+                       Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
+               } catch (ClassNotFoundException e) {
+                       // not in the classpath, cannot run this test
+                       String msg = "Skipping test because S3FileSystem is not 
in the class path";
+                       log.info(msg);
+                       assumeNoException(msg, e);
+               }
+               testRecursiveUploadForYarn("s3", "testYarn-s3");
+       }
+
+       @Test
+       public void testRecursiveUploadForYarnS3n() throws Exception {
+               try {
+                       
Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
+               } catch (ClassNotFoundException e) {
+                       // not in the classpath, cannot run this test
+                       String msg = "Skipping test because NativeS3FileSystem 
is not in the class path";
+                       log.info(msg);
+                       assumeNoException(msg, e);
+               }
+               testRecursiveUploadForYarn("s3n", "testYarn-s3n");
+       }
+
+       @Test
+       public void testRecursiveUploadForYarnS3a() throws Exception {
+               try {
+                       Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
+               } catch (ClassNotFoundException e) {
+                       // not in the classpath, cannot run this test
+                       String msg = "Skipping test because S3AFileSystem is 
not in the class path";
+                       log.info(msg);
+                       assumeNoException(msg, e);
+               }
+               testRecursiveUploadForYarn("s3a", "testYarn-s3a");
+       }
+}

Reply via email to