[FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs + includes a new unit tests for recursive uploads to hfds:// targets + add a unit test for recursive file uploads to s3:// via s3a
[FLINK-4228][yarn/s3] turn the dependencies around Instead of having flink-s3-fs-hadoop depend on flink-yarn_<scala_version>, let flink-yarn depend on the s3 filesystem and implement the test there. This is safer with regards to the scala-independent flink-s3-fs-hadoop project. [FLINK-4228][yarn] change the S3 upload tests to use Hadoop's S3 implementations This is how YARN would use it and what should really be tested. [FLINK-4228][yarn] enable S3 tests for newer Hadoop versions - requires the 'include_hadoop_aws' build profile (or property) to be set - requires a newer aws-sdk version (than Hadoop pulls in) to work with our httpcomponents version - we also add a check that at least one S3 implementation is tested to not silently ignore all tests because of such a missing dependency This closes #4939. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36b80756 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36b80756 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36b80756 Branch: refs/heads/release-1.4 Commit: 36b807567d93c431c1498241a42c20221cb6a664 Parents: 666b1b2 Author: Nico Kruber <n...@data-artisans.com> Authored: Wed Nov 9 15:04:50 2016 -0500 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sat Nov 18 10:58:53 2017 +0100 ---------------------------------------------------------------------- .travis.yml | 2 +- flink-yarn/pom.xml | 63 ++++++ .../yarn/AbstractYarnClusterDescriptor.java | 192 +++++++++++----- .../main/java/org/apache/flink/yarn/Utils.java | 86 +++++--- .../apache/flink/yarn/YarnFileStageTest.java | 218 ++++++++++++++++++ .../flink/yarn/YarnFileStageTestS3ITCase.java | 220 +++++++++++++++++++ 6 files changed, 698 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/36b80756/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index daf2186..5e2ef74 100644 --- a/.travis.yml +++ b/.travis.yml @@ -59,7 +59,7 @@ matrix: - jdk: "oraclejdk8" env: - TEST="misc" - - PROFILE="-Dhadoop.version=2.8.0" + - PROFILE="-Dhadoop.version=2.8.0 -Dinclude_hadoop_aws" - CACHE_NAME=JDK8_H280_M - jdk: "openjdk8" env: http://git-wip-us.apache.org/repos/asf/flink/blob/36b80756/flink-yarn/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml index db6ee88..f621acf 100644 --- a/flink-yarn/pom.xml +++ b/flink-yarn/pom.xml @@ -31,6 +31,12 @@ under the License. <name>flink-yarn</name> <packaging>jar</packaging> + <properties> + <!-- for testing (will override Hadoop's default dependency on too low SDK versions that + do not work with our httpcomponents version) --> + <aws.sdk.version>1.11.171</aws.sdk.version> + </properties> + <dependencies> <!-- core dependencies --> @@ -153,6 +159,63 @@ under the License. </plugins> </build> </profile> + + <profile> + <!-- Hadoop >= 2.6 moved the S3 file systems from hadoop-common into hadoop-aws artifact + (see https://issues.apache.org/jira/browse/HADOOP-11074) + We can add the (test) dependency per default once 2.6 is the minimum required version. + --> + <id>include_hadoop_aws</id> + <activation> + <property> + <name>include_hadoop_aws</name> + </property> + </activation> + <dependencies> + <!-- for the S3 tests of YarnFileStageTestS3ITCase --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <!-- The aws-java-sdk-core requires jackson 2.6, but + hadoop pulls in 2.3 --> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- override Hadoop's default dependency on too low SDK versions that do not work + with our httpcomponents version when initialising the s3a file system --> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + <version>${aws.sdk.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-sts</artifactId> + <version>${aws.sdk.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + </profile> + </profiles> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/36b80756/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 8ecc371..5ac5c4e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.configuration.ConfigConstants; @@ -624,6 +625,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor // Copy the application master jar to the filesystem // Create a local resource to point to the destination jar path final FileSystem fs = FileSystem.get(conf); + final Path homeDir = fs.getHomeDirectory(); // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method. if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && @@ -705,11 +707,25 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor StringBuilder envShipFileList = new StringBuilder(); // upload and register ship files - List<String> systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, localResources, envShipFileList); + List<String> systemClassPaths = uploadAndRegisterFiles( + systemShipFiles, + fs, + homeDir, + appId, + paths, + localResources, + envShipFileList); List<String> userClassPaths; if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) { - userClassPaths = uploadAndRegisterFiles(userJarFiles, fs, appId.toString(), paths, localResources, envShipFileList); + userClassPaths = uploadAndRegisterFiles( + userJarFiles, + fs, + homeDir, + appId, + paths, + localResources, + envShipFileList); } else { userClassPaths = Collections.emptyList(); } @@ -739,32 +755,29 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } // Setup jar for ApplicationMaster - LocalResource appMasterJar = Records.newRecord(LocalResource.class); - Path remotePathJar = Utils.setupLocalResource( + Path remotePathJar = setupSingleLocalResource( + "flink.jar", fs, - appId.toString(), + appId, flinkJarPath, - appMasterJar, - fs.getHomeDirectory()); - - localResources.put("flink.jar", appMasterJar); + localResources, + homeDir, + ""); // Upload the flink configuration - LocalResource flinkConf = Records.newRecord(LocalResource.class); - // write out configuration file File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null); tmpConfigurationFile.deleteOnExit(); BootstrapTools.writeConfiguration(flinkConfiguration, tmpConfigurationFile); - Path remotePathConf = Utils.setupLocalResource( + Path remotePathConf = setupSingleLocalResource( + "flink-conf.yaml", fs, - appId.toString(), + appId, new Path(tmpConfigurationFile.getAbsolutePath()), - flinkConf, - fs.getHomeDirectory()); - - localResources.put("flink-conf.yaml", flinkConf); + localResources, + homeDir, + ""); paths.add(remotePathJar); classPathBuilder.append("flink.jar").append(File.pathSeparator); @@ -781,11 +794,16 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor ObjectOutputStream obOutput = new ObjectOutputStream(output);){ obOutput.writeObject(jobGraph); } - LocalResource jobgraph = Records.newRecord(LocalResource.class); - Path remoteJobGraph = - Utils.setupLocalResource(fs, appId.toString(), new Path(fp.toURI()), jobgraph, fs.getHomeDirectory()); - localResources.put("job.graph", jobgraph); - paths.add(remoteJobGraph); + + Path pathFromYarnURL = setupSingleLocalResource( + "job.graph", + fs, + appId, + new Path(fp.toURI()), + localResources, + homeDir, + ""); + paths.add(pathFromYarnURL); classPathBuilder.append("job.graph").append(File.pathSeparator); } catch (Exception e) { LOG.warn("Add job graph to local resource fail"); @@ -793,7 +811,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } - Path yarnFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId + '/'); + Path yarnFilesDir = new Path(homeDir, ".flink/" + appId + '/'); FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); fs.setPermission(yarnFilesDir, permission); // set permission for path. @@ -810,32 +828,44 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor if (krb5Config != null && krb5Config.length() != 0) { File krb5 = new File(krb5Config); LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath()); - LocalResource krb5ConfResource = Records.newRecord(LocalResource.class); Path krb5ConfPath = new Path(krb5.getAbsolutePath()); - remoteKrb5Path = Utils.setupLocalResource(fs, appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory()); - localResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource); + remoteKrb5Path = setupSingleLocalResource( + Utils.KRB5_FILE_NAME, + fs, + appId, + krb5ConfPath, + localResources, + homeDir, + ""); File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME); LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath()); - LocalResource yarnConfResource = Records.newRecord(LocalResource.class); Path yarnSitePath = new Path(f.getAbsolutePath()); - remoteYarnSiteXmlPath = Utils.setupLocalResource(fs, appId.toString(), yarnSitePath, yarnConfResource, fs.getHomeDirectory()); - localResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource); - + remoteYarnSiteXmlPath = setupSingleLocalResource( + Utils.YARN_SITE_FILE_NAME, + fs, + appId, + yarnSitePath, + localResources, + homeDir, + ""); hasKrb5 = true; } } // setup security tokens - LocalResource keytabResource = null; Path remotePathKeytab = null; String keytab = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB); if (keytab != null) { LOG.info("Adding keytab {} to the AM container local resource bucket", keytab); - keytabResource = Records.newRecord(LocalResource.class); - Path keytabPath = new Path(keytab); - remotePathKeytab = Utils.setupLocalResource(fs, appId.toString(), keytabPath, keytabResource, fs.getHomeDirectory()); - localResources.put(Utils.KEYTAB_FILE_NAME, keytabResource); + remotePathKeytab = setupSingleLocalResource( + Utils.KEYTAB_FILE_NAME, + fs, + appId, + new Path(keytab), + localResources, + homeDir, + ""); } final ContainerLaunchContext amContainer = setupApplicationMasterContainer( @@ -866,7 +896,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(clusterSpecification.getTaskManagerMemoryMB())); appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString()); appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString()); - appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString()); + appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homeDir.toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString()); appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager())); appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached)); @@ -876,7 +906,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor // https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); - if (keytabResource != null) { + if (remotePathKeytab != null) { appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString()); String principal = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal); @@ -981,25 +1011,54 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor return report; } - private static List<String> uploadAndRegisterFiles( - Collection<File> shipFiles, + /** + * Uploads and registers a single resource and adds it to <tt>localResources</tt>. + * + * @param key + * the key to add the resource under + * @param fs + * the remote file system to upload to + * @param appId + * application ID + * @param localSrcPath + * local path to the file + * @param localResources + * map of resources + * + * @return the remote path to the uploaded resource + */ + private static Path setupSingleLocalResource( + String key, FileSystem fs, - String appId, - List<Path> remotePaths, + ApplicationId appId, + Path localSrcPath, Map<String, LocalResource> localResources, - StringBuilder envShipFileList) throws IOException { - final List<String> classPaths = new ArrayList<>(2 + shipFiles.size()); - for (File shipFile : shipFiles) { - LocalResource shipResources = Records.newRecord(LocalResource.class); + Path targetHomeDir, + String relativeTargetPath) throws IOException, URISyntaxException { - Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); - Path remotePath = - Utils.setupLocalResource(fs, appId, shipLocalPath, shipResources, fs.getHomeDirectory()); + Tuple2<Path, LocalResource> resource = Utils.setupLocalResource( + fs, + appId.toString(), + localSrcPath, + targetHomeDir, + relativeTargetPath); - remotePaths.add(remotePath); + localResources.put(key, resource.f1); - localResources.put(shipFile.getName(), shipResources); + return resource.f0; + } + + static List<String> uploadAndRegisterFiles( + Collection<File> shipFiles, + FileSystem fs, + Path targetHomeDir, + ApplicationId appId, + List<Path> remotePaths, + Map<String, LocalResource> localResources, + StringBuilder envShipFileList) throws IOException, URISyntaxException { + final List<String> classPaths = new ArrayList<>(2 + shipFiles.size()); + for (File shipFile : shipFiles) { if (shipFile.isDirectory()) { // add directories to the classpath java.nio.file.Path shipPath = shipFile.toPath(); @@ -1011,17 +1070,40 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor throws IOException { java.nio.file.Path relativePath = parentPath.relativize(file); - classPaths.add(relativePath.toString()); - - return FileVisitResult.CONTINUE; + String key = relativePath.toString(); + try { + Path remotePath = setupSingleLocalResource( + key, + fs, + appId, + new Path(file.toUri()), + localResources, + targetHomeDir, + relativePath.getParent().toString()); + remotePaths.add(remotePath); + envShipFileList.append(key).append("=").append(remotePath).append(","); + + // add files to the classpath + classPaths.add(key); + + return FileVisitResult.CONTINUE; + } catch (URISyntaxException e) { + throw new IOException(e); + } } }); } else { + Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath()); + String key = shipFile.getName(); + Path remotePath = setupSingleLocalResource( + key, fs, appId, shipLocalPath, localResources, targetHomeDir, ""); + remotePaths.add(remotePath); + envShipFileList.append(key).append("=").append(remotePath).append(","); + // add files to the classpath - classPaths.add(shipFile.getName()); + classPaths.add(key); } - envShipFileList.append(remotePath).append(","); } return classPaths; } http://git-wip-us.apache.org/repos/asf/flink/blob/36b80756/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 32cbb64..652afec 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -117,33 +118,60 @@ public final class Utils { } /** + * Copy a local file to a remote file system. + * + * @param fs + * remote filesystem + * @param appId + * application ID + * @param localSrcPath + * path to the local file + * @param homedir + * remote home directory base (will be extended) + * @param relativeTargetPath + * relative target path of the file (will be prefixed be the full home directory we set up) + * * @return Path to remote file (usually hdfs) - * @throws IOException */ - public static Path setupLocalResource( - FileSystem fs, - String appId, Path localRsrcPath, - LocalResource appMasterJar, - Path homedir) throws IOException { + static Tuple2<Path, LocalResource> setupLocalResource( + FileSystem fs, + String appId, + Path localSrcPath, + Path homedir, + String relativeTargetPath) throws IOException { + + if (new File(localSrcPath.toUri().getPath()).isDirectory()) { + throw new IllegalArgumentException("File to copy must not be a directory: " + + localSrcPath); + } // copy resource to HDFS - String suffix = ".flink/" + appId + "/" + localRsrcPath.getName(); + String suffix = + ".flink/" + + appId + + (relativeTargetPath.isEmpty() ? "" : "/" + relativeTargetPath) + + "/" + localSrcPath.getName(); Path dst = new Path(homedir, suffix); - LOG.info("Copying from " + localRsrcPath + " to " + dst); - fs.copyFromLocalFile(localRsrcPath, dst); - registerLocalResource(fs, dst, appMasterJar); - return dst; + LOG.info("Copying from " + localSrcPath + " to " + dst); + + fs.copyFromLocalFile(false, true, localSrcPath, dst); + + // now create the resource instance + LocalResource resource = registerLocalResource(fs, dst); + return Tuple2.of(dst, resource); } - public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException { + private static LocalResource registerLocalResource(FileSystem fs, Path remoteRsrcPath) throws IOException { + LocalResource localResource = Records.newRecord(LocalResource.class); FileStatus jarStat = fs.getFileStatus(remoteRsrcPath); localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri())); localResource.setSize(jarStat.getLen()); localResource.setTimestamp(jarStat.getModificationTime()); localResource.setType(LocalResourceType.FILE); localResource.setVisibility(LocalResourceVisibility.APPLICATION); + return localResource; } public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException { @@ -340,10 +368,9 @@ public final class Utils { LocalResource keytabResource = null; if (remoteKeytabPath != null) { log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath); - keytabResource = Records.newRecord(LocalResource.class); Path keytabPath = new Path(remoteKeytabPath); FileSystem fs = keytabPath.getFileSystem(yarnConfig); - registerLocalResource(fs, keytabPath, keytabResource); + keytabResource = registerLocalResource(fs, keytabPath); } //To support Yarn Secure Integration Test Scenario @@ -352,30 +379,28 @@ public final class Utils { boolean hasKrb5 = false; if (remoteYarnConfPath != null && remoteKrb5Path != null) { log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath); - yarnConfResource = Records.newRecord(LocalResource.class); Path yarnConfPath = new Path(remoteYarnConfPath); FileSystem fs = yarnConfPath.getFileSystem(yarnConfig); - registerLocalResource(fs, yarnConfPath, yarnConfResource); + yarnConfResource = registerLocalResource(fs, yarnConfPath); log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path); - krb5ConfResource = Records.newRecord(LocalResource.class); Path krb5ConfPath = new Path(remoteKrb5Path); fs = krb5ConfPath.getFileSystem(yarnConfig); - registerLocalResource(fs, krb5ConfPath, krb5ConfResource); + krb5ConfResource = registerLocalResource(fs, krb5ConfPath); hasKrb5 = true; } // register Flink Jar with remote HDFS - LocalResource flinkJar = Records.newRecord(LocalResource.class); + final LocalResource flinkJar; { Path remoteJarPath = new Path(remoteFlinkJarPath); FileSystem fs = remoteJarPath.getFileSystem(yarnConfig); - registerLocalResource(fs, remoteJarPath, flinkJar); + flinkJar = registerLocalResource(fs, remoteJarPath); } // register conf with local fs - LocalResource flinkConf = Records.newRecord(LocalResource.class); + final LocalResource flinkConf; { // write the TaskManager configuration to a local file final File taskManagerConfigFile = @@ -385,8 +410,13 @@ public final class Utils { Path homeDirPath = new Path(clientHomeDir); FileSystem fs = homeDirPath.getFileSystem(yarnConfig); - setupLocalResource(fs, appId, - new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir)); + + flinkConf = setupLocalResource( + fs, + appId, + new Path(taskManagerConfigFile.toURI()), + homeDirPath, + "").f1; log.info("Prepared local resource for modified yaml: {}", flinkConf); } @@ -408,10 +438,11 @@ public final class Utils { // prepare additional files to be shipped for (String pathStr : shipListString.split(",")) { if (!pathStr.isEmpty()) { - LocalResource resource = Records.newRecord(LocalResource.class); - Path path = new Path(pathStr); - registerLocalResource(path.getFileSystem(yarnConfig), path, resource); - taskManagerLocalResources.put(path.getName(), resource); + String[] keyAndPath = pathStr.split("="); + require(keyAndPath.length == 2, "Invalid entry in ship file list: %s", pathStr); + Path path = new Path(keyAndPath[1]); + LocalResource resource = registerLocalResource(path.getFileSystem(yarnConfig), path); + taskManagerLocalResources.put(keyAndPath[0], resource); } } @@ -488,4 +519,5 @@ public final class Utils { throw new RuntimeException(String.format(message, values)); } } + } http://git-wip-us.apache.org/repos/asf/flink/blob/36b80756/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java new file mode 100644 index 0000000..4d38253 --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Tests for verifying file staging during submission to YARN works. + */ +public class YarnFileStageTest extends TestLogger { + + @ClassRule + public static final TemporaryFolder CLASS_TEMP_DIR = new TemporaryFolder(); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private static MiniDFSCluster hdfsCluster; + + private static Path hdfsRootPath; + + private org.apache.hadoop.conf.Configuration hadoopConfig; + + // ------------------------------------------------------------------------ + // Test setup and shutdown + // ------------------------------------------------------------------------ + + @BeforeClass + public static void createHDFS() throws Exception { + Assume.assumeTrue(!OperatingSystem.isWindows()); + + final File tempDir = CLASS_TEMP_DIR.newFolder(); + + org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath()); + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + hdfsCluster = builder.build(); + hdfsRootPath = new Path(hdfsCluster.getURI()); + } + + @AfterClass + public static void destroyHDFS() { + if (hdfsCluster != null) { + hdfsCluster.shutdown(); + } + hdfsCluster = null; + hdfsRootPath = null; + } + + @Before + public void initConfig() { + hadoopConfig = new org.apache.hadoop.conf.Configuration(); + hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, hdfsRootPath.toString()); + } + + /** + * Verifies that nested directories are properly copied with a <tt>hdfs://</tt> file + * system (from a <tt>file:///absolute/path</tt> source path). + */ + @Test + public void testCopyFromLocalRecursiveWithScheme() throws Exception { + final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig); + final Path targetDir = targetFileSystem.getWorkingDirectory(); + + testCopyFromLocalRecursive(targetFileSystem, targetDir, tempFolder, true); + } + + /** + * Verifies that nested directories are properly copied with a <tt>hdfs://</tt> file + * system (from a <tt>/absolute/path</tt> source path). + */ + @Test + public void testCopyFromLocalRecursiveWithoutScheme() throws Exception { + final FileSystem targetFileSystem = hdfsRootPath.getFileSystem(hadoopConfig); + final Path targetDir = targetFileSystem.getWorkingDirectory(); + + testCopyFromLocalRecursive(targetFileSystem, targetDir, tempFolder, false); + } + + /** + * Verifies that nested directories are properly copied with the given filesystem and paths. + * + * @param targetFileSystem + * file system of the target path + * @param targetDir + * target path (URI like <tt>hdfs://...</tt>) + * @param tempFolder + * JUnit temporary folder rule to create the source directory with + * @param addSchemeToLocalPath + * whether add the <tt>file://</tt> scheme to the local path to copy from + */ + public static void testCopyFromLocalRecursive( + FileSystem targetFileSystem, + Path targetDir, + TemporaryFolder tempFolder, + boolean addSchemeToLocalPath) throws Exception { + + // directory must not yet exist + assertFalse(targetFileSystem.exists(targetDir)); + + final File srcDir = tempFolder.newFolder(); + final Path srcPath; + if (addSchemeToLocalPath) { + srcPath = new Path("file://" + srcDir.getAbsolutePath()); + } else { + srcPath = new Path(srcDir.getAbsolutePath()); + } + + HashMap<String /* (relative) path */, /* contents */ String> srcFiles = new HashMap<>(4); + + // create and fill source files + srcFiles.put("1", "Hello 1"); + srcFiles.put("2", "Hello 2"); + srcFiles.put("nested/3", "Hello nested/3"); + srcFiles.put("nested/4/5", "Hello nested/4/5"); + for (Map.Entry<String, String> src : srcFiles.entrySet()) { + File file = new File(srcDir, src.getKey()); + //noinspection ResultOfMethodCallIgnored + file.getParentFile().mkdirs(); + try (DataOutputStream out = new DataOutputStream(new FileOutputStream(file))) { + out.writeUTF(src.getValue()); + } + } + + // copy the created directory recursively: + try { + List<Path> remotePaths = new ArrayList<>(); + HashMap<String, LocalResource> localResources = new HashMap<>(); + AbstractYarnClusterDescriptor.uploadAndRegisterFiles( + Collections.singletonList(new File(srcPath.toUri().getPath())), + targetFileSystem, + targetDir, + ApplicationId.newInstance(0, 0), + remotePaths, + localResources, + new StringBuilder()); + assertEquals(srcFiles.size(), localResources.size()); + + Path workDir = ConverterUtils + .getPathFromYarnURL(localResources.get(srcPath.getName() + "/1").getResource()) + .getParent(); + + RemoteIterator<LocatedFileStatus> targetFilesIterator = + targetFileSystem.listFiles(workDir, true); + HashMap<String /* (relative) path */, /* contents */ String> targetFiles = + new HashMap<>(4); + + final int workDirPrefixLength = + workDir.toString().length() + 1; // one more for the concluding "/" + while (targetFilesIterator.hasNext()) { + LocatedFileStatus targetFile = targetFilesIterator.next(); + + try (FSDataInputStream in = targetFileSystem.open(targetFile.getPath())) { + String absolutePathString = targetFile.getPath().toString(); + String relativePath = absolutePathString.substring(workDirPrefixLength); + targetFiles.put(relativePath, in.readUTF()); + + assertEquals("extraneous data in file " + relativePath, -1, in.read()); + } + } + + assertThat(targetFiles, equalTo(srcFiles)); + } finally { + // clean up + targetFileSystem.delete(targetDir, true); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/36b80756/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java new file mode 100644 index 0000000..74fb596 --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeNoException; + +/** + * Tests for verifying file staging during submission to YARN works with the S3A file system. + * + * <p>Note that the setup is similar to <tt>org.apache.flink.fs.s3hadoop.HadoopS3FileSystemITCase</tt>. + */ +public class YarnFileStageTestS3ITCase extends TestLogger { + + private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET"); + + private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID(); + + private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY"); + private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY"); + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + /** + * Number of tests executed. + */ + private static int numRecursiveUploadTests = 0; + + /** + * Will be updated by {@link #checkCredentialsAndSetup()} if the test is not skipped. + */ + private static boolean skipTest = true; + + @BeforeClass + public static void checkCredentialsAndSetup() throws IOException { + // check whether credentials exist + Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null); + Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null); + Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null); + + skipTest = false; + + setupCustomHadoopConfig(); + } + + @AfterClass + public static void resetFileSystemConfiguration() throws IOException { + FileSystem.initialize(new Configuration()); + } + + @AfterClass + public static void checkAtLeastOneTestRun() { + if (!skipTest) { + assertThat( + "No S3 filesystem upload test executed. Please activate the " + + "'include_hadoop_aws' build profile or set '-Dinclude_hadoop_aws' during build " + + "(Hadoop >= 2.6 moved S3 filesystems out of hadoop-common).", + numRecursiveUploadTests, greaterThan(0)); + } + } + + /** + * Create a Hadoop config file containing S3 access credentials. + * + * <p>Note that we cannot use them as part of the URL since this may fail if the credentials + * contain a "/" (see <a href="https://issues.apache.org/jira/browse/HADOOP-3733">HADOOP-3733</a>). + */ + private static void setupCustomHadoopConfig() throws IOException { + File hadoopConfig = TEMP_FOLDER.newFile(); + Map<String /* key */, String /* value */> parameters = new HashMap<>(); + + // set all different S3 fs implementation variants' configuration keys + parameters.put("fs.s3a.access.key", ACCESS_KEY); + parameters.put("fs.s3a.secret.key", SECRET_KEY); + + parameters.put("fs.s3.awsAccessKeyId", ACCESS_KEY); + parameters.put("fs.s3.awsSecretAccessKey", SECRET_KEY); + + parameters.put("fs.s3n.awsAccessKeyId", ACCESS_KEY); + parameters.put("fs.s3n.awsSecretAccessKey", SECRET_KEY); + + try (PrintStream out = new PrintStream(new FileOutputStream(hadoopConfig))) { + out.println("<?xml version=\"1.0\"?>"); + out.println("<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>"); + out.println("<configuration>"); + for (Map.Entry<String, String> entry : parameters.entrySet()) { + out.println("\t<property>"); + out.println("\t\t<name>" + entry.getKey() + "</name>"); + out.println("\t\t<value>" + entry.getValue() + "</value>"); + out.println("\t</property>"); + } + out.println("</configuration>"); + } + + final Configuration conf = new Configuration(); + conf.setString(ConfigConstants.HDFS_SITE_CONFIG, hadoopConfig.getAbsolutePath()); + + FileSystem.initialize(conf); + } + + /** + * Verifies that nested directories are properly copied with to the given S3 path (using the + * appropriate file system) during resource uploads for YARN. + * + * @param scheme + * file system scheme + * @param pathSuffix + * test path suffix which will be the test's target path + */ + private void testRecursiveUploadForYarn(String scheme, String pathSuffix) throws Exception { + ++numRecursiveUploadTests; + + final Path basePath = new Path(scheme + "://" + BUCKET + '/' + TEST_DATA_DIR); + final HadoopFileSystem fs = (HadoopFileSystem) basePath.getFileSystem(); + + assumeFalse(fs.exists(basePath)); + + try { + final Path directory = new Path(basePath, pathSuffix); + + YarnFileStageTest.testCopyFromLocalRecursive(fs.getHadoopFileSystem(), + new org.apache.hadoop.fs.Path(directory.toUri()), tempFolder, true); + + // now directory must be gone + assertFalse(fs.exists(directory)); + } finally { + // clean up + fs.delete(basePath, true); + } + } + + /** + * Verifies that nested directories are properly copied with a <tt>s3a://</tt> file + * systems during resource uploads for YARN. + */ + @Test + public void testRecursiveUploadForYarnS3() throws Exception { + try { + Class.forName("org.apache.hadoop.fs.s3.S3FileSystem"); + } catch (ClassNotFoundException e) { + // not in the classpath, cannot run this test + String msg = "Skipping test because S3FileSystem is not in the class path"; + log.info(msg); + assumeNoException(msg, e); + } + testRecursiveUploadForYarn("s3", "testYarn-s3"); + } + + @Test + public void testRecursiveUploadForYarnS3n() throws Exception { + try { + Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem"); + } catch (ClassNotFoundException e) { + // not in the classpath, cannot run this test + String msg = "Skipping test because NativeS3FileSystem is not in the class path"; + log.info(msg); + assumeNoException(msg, e); + } + testRecursiveUploadForYarn("s3n", "testYarn-s3n"); + } + + @Test + public void testRecursiveUploadForYarnS3a() throws Exception { + try { + Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem"); + } catch (ClassNotFoundException e) { + // not in the classpath, cannot run this test + String msg = "Skipping test because S3AFileSystem is not in the class path"; + log.info(msg); + assumeNoException(msg, e); + } + testRecursiveUploadForYarn("s3a", "testYarn-s3a"); + } +}