zentol closed pull request #2288: Feature/s3 a fix URL: https://github.com/apache/flink/pull/2288
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/util/FileSystemCopyUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/util/FileSystemCopyUtils.java new file mode 100644 index 00000000000..06c3b14679e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/util/FileSystemCopyUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.fs.util; + + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * This is a utility class to impose recursive copy from local directory when using {@link FileSystem} implementations + * as we cannot assume that all FileSystem implementations will implement a recursive copy. + * Presently, <a "href"="https://github.com/Aloisius/hadoop-s3a">S3a</a>does not. + */ +public class FileSystemCopyUtils { + + /** + * Recursive copy to work around FileSystem implementations that do not implement it. + * @param fs + * @param localPath + * @param remotePath + * @throws IOException + */ + public static void copyFromLocalFile(FileSystem fs, boolean overwrite, Path localPath, Path remotePath) throws IOException { + localPath = checkFileScheme(localPath); + File localFile = new File(localPath.toUri()); + if (localFile.isDirectory()) { + for (File file : localFile.listFiles()) { + copyFromLocalFile(fs, overwrite, new Path("file://" + file.getAbsolutePath()), new Path(remotePath,file.getName())); + } + } else { + fs.copyFromLocalFile(false, overwrite, localPath,remotePath); + } + } + + /** + * All paths + * @param localRsrcPath + * @return + * @throws IOException + */ + private static Path checkFileScheme(Path localRsrcPath) throws IOException { + if (localRsrcPath.isAbsoluteAndSchemeAuthorityNull()) { + try { + return new Path(new URI("file://" + localRsrcPath.toString())); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + return localRsrcPath; + } + +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java index 37dc2c703c3..427763841e9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java @@ -24,8 +24,11 @@ import org.apache.hadoop.fs.Path; import java.io.File; +import java.io.IOException; import java.net.URI; +import static org.apache.flink.runtime.fs.util.FileSystemCopyUtils.copyFromLocalFile; + /** * Utility for copying from local file system to a HDFS {@link FileSystem}. */ @@ -46,8 +49,8 @@ public void run() { FileSystem fs = FileSystem.get(remotePath, hadoopConf); - fs.copyFromLocalFile(new Path(localPath.getAbsolutePath()), - new Path(remotePath)); + copyFromLocalFile(fs, true, new Path("file://" + localPath.getAbsolutePath()), new Path(checkInitialDirectory(fs,localPath,remotePath))); + } catch (Exception t) { asyncException.f0 = t; } @@ -62,4 +65,18 @@ public void run() { throw asyncException.f0; } } + + /** + * Ensure that target path terminates with a new directory to be created by fs. If remoteURI does not specify a new + * directory, append local directory name. + * @param fs + * @param localPath + * @param remoteURI + * @return + * @throws IOException + */ + private static URI checkInitialDirectory(FileSystem fs, File localPath, URI remoteURI) throws IOException { + return remoteURI; + } + } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java index f16750d8f18..0b67abfdf7f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java @@ -30,8 +30,14 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class HDFSCopyUtilitiesTest { @@ -70,6 +76,86 @@ public void testCopyFromLocal() throws Exception { } } + + /** + * This test verifies that nested directories are properly copied. + */ + @Test + public void testCopyFromLocalRecursive() throws Exception { + + File rootDir = tempFolder.newFolder(); + File nestedDir = new File(rootDir,"nested"); + nestedDir.mkdir(); + + Map<String,File> copyFiles = new HashMap<String,File>(); + + copyFiles.put("1",new File(rootDir, "1")); + copyFiles.put("2",new File(rootDir, "2")); + copyFiles.put("3",new File(nestedDir, "3")); + + for (File file : copyFiles.values()) { + try (DataOutputStream out = new DataOutputStream(new FileOutputStream(file))) { + out.writeUTF("Hello there, " + file.getName()); + } + } + //add root and nested dirs to expected output + copyFiles.put(rootDir.getName(),rootDir); + copyFiles.put("nested",nestedDir); + + assertEquals(5,copyFiles.size()); + + //Test for copy to unspecified target directory + File copyDirU = tempFolder.newFolder(); + HDFSCopyFromLocal.copyFromLocal( + rootDir, + new Path("file://" + copyDirU.getAbsolutePath()).toUri()); + + //Test for copy to specified target directory + File copyDirQ = tempFolder.newFolder(); + HDFSCopyFromLocal.copyFromLocal( + rootDir, + new Path("file://" + copyDirQ.getAbsolutePath() + "/" + rootDir.getName()).toUri()); + + //We only want to verify intended files, not CRC shadow files. + FilenameFilter noCrc = new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return !name.endsWith(".crc"); + } + }; + + File dirCopyU = copyDirU.listFiles(noCrc)[0]; + File dirCopyQ = copyDirQ.listFiles(noCrc)[0]; + + assertEquals(dirCopyU.getName(),dirCopyQ.getName()); + + assertEquals(rootDir.getName(),dirCopyU.getName()); + assertNotNull(copyFiles.remove(dirCopyU.getName())); + + File[] filesU = dirCopyU.listFiles(noCrc); + File[] filesQ = dirCopyQ.listFiles(noCrc); + + assertEquals(filesU.length, 3); + assertEquals(filesU.length, filesQ.length); + + Arrays.sort(filesU); + Arrays.sort(filesQ); + + for (int i = 0; i < filesU.length; i++) { + assertEquals(filesU[i].getName(), filesQ[i].getName()); + copyFiles.remove(filesU[i].getName()); + if (filesU[i].isDirectory()) { + assertEquals(filesU[i].listFiles(noCrc).length,1); + assertEquals(filesQ[i].listFiles(noCrc).length,1); + copyFiles.remove(filesU[i].listFiles(noCrc)[0].getName()); + } + } + + assertTrue(copyFiles.isEmpty()); + + } + + /** * This test verifies that a hadoop configuration is correctly read in the external * process copying tools. diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 1496d61c90e..a9b74b4b34f 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -26,6 +26,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.net.URI; +import java.net.URISyntaxException; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.slf4j.Logger; @@ -60,6 +62,14 @@ private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + /** Keytab file name populated in YARN container */ + public static final String KEYTAB_FILE_NAME = "krb5.keytab"; + + /** KRB5 file name populated in YARN container for secure IT run */ + public static final String KRB5_FILE_NAME = "krb5.conf"; + + /** Yarn site xml file name populated in YARN container for secure IT run */ + public static final String YARN_SITE_FILE_NAME = "yarn-site.xml"; /** * See documentation @@ -124,9 +134,9 @@ public static Path setupLocalResource( String suffix = ".flink/" + appId + "/" + localRsrcPath.getName(); Path dst = new Path(homedir, suffix); - + localRsrcPath = checkScheme(localRsrcPath); LOG.info("Copying from " + localRsrcPath + " to " + dst); - fs.copyFromLocalFile(localRsrcPath, dst); + copyFromLocalFile(fs,localRsrcPath, dst); registerLocalResource(fs, dst, appMasterJar); return dst; } @@ -256,4 +266,34 @@ private Utils() { } return result; } + + /** + * Recursive copy to work around FileSystem implementations that do not implement it. + * @param fs + * @param localPath + * @param remotePath + * @throws IOException + */ + public static void copyFromLocalFile(FileSystem fs, Path localPath, Path remotePath) throws IOException { + File localFile = new File(localPath.toUri()); + if (localFile.isDirectory()) { + for (File file : localFile.listFiles()) { + copyFromLocalFile(fs, new Path("file://" + file.getAbsolutePath()), new Path(remotePath,file.getName())); + } + } else { + fs.copyFromLocalFile(false, true, localPath,remotePath); + } + } + + + private static Path checkScheme(Path localRsrcPath) throws IOException { + if (localRsrcPath.isAbsoluteAndSchemeAuthorityNull()) { + try { + return new Path(new URI("file://" + localRsrcPath.toString())); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + return localRsrcPath; + } } diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java new file mode 100644 index 00000000000..8de3f3ef230 --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.yarn; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.streaming.util.HDFSCopyFromLocal; +import org.apache.flink.streaming.util.HDFSCopyToLocal; +import org.apache.flink.util.OperatingSystem; +import org.apache.hadoop.fs.FileSystem; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.*; + +public class YarnFileStageTest { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows()); + } + + /** + * This test verifies that nested directories are properly copied. + */ + @Test + public void testCopyFromLocalRecursive() throws Exception { + + FileSystem fs = FileSystem.get(HadoopFileSystem.getHadoopConfiguration()); + File rootDir = tempFolder.newFolder(); + File nestedDir = new File(rootDir,"nested"); + nestedDir.mkdir(); + + Map<String,File> copyFiles = new HashMap<String,File>(); + + copyFiles.put("1",new File(rootDir, "1")); + copyFiles.put("2",new File(rootDir, "2")); + copyFiles.put("3",new File(nestedDir, "3")); + + for (File file : copyFiles.values()) { + try (DataOutputStream out = new DataOutputStream(new FileOutputStream(file))) { + out.writeUTF("Hello there, " + file.getName()); + } + } + //add root and nested dirs to expected output + copyFiles.put(rootDir.getName(),rootDir); + copyFiles.put("nested",nestedDir); + + assertEquals(5,copyFiles.size()); + + //Test for copy to unspecified target directory + File copyDirU = tempFolder.newFolder(); + HDFSCopyFromLocal.copyFromLocal( + rootDir, + new Path("file://" + copyDirU.getAbsolutePath()).toUri()); + + //Test for copy to specified target directory + File copyDirQ = tempFolder.newFolder(); + HDFSCopyFromLocal.copyFromLocal( + rootDir, + new Path("file://" + copyDirQ.getAbsolutePath() + "/" + rootDir.getName()).toUri()); + + //We only want to verify intended files, not CRC shadow files. + FilenameFilter noCrc = new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return !name.endsWith(".crc"); + } + }; + + File dirCopyU = copyDirU.listFiles(noCrc)[0]; + File dirCopyQ = copyDirQ.listFiles(noCrc)[0]; + + assertEquals(dirCopyU.getName(),dirCopyQ.getName()); + + assertEquals(rootDir.getName(),dirCopyU.getName()); + assertNotNull(copyFiles.remove(dirCopyU.getName())); + + File[] filesU = dirCopyU.listFiles(noCrc); + File[] filesQ = dirCopyQ.listFiles(noCrc); + + assertEquals(filesU.length, 3); + assertEquals(filesU.length, filesQ.length); + + Arrays.sort(filesU); + Arrays.sort(filesQ); + + for (int i = 0; i < filesU.length; i++) { + assertEquals(filesU[i].getName(), filesQ[i].getName()); + copyFiles.remove(filesU[i].getName()); + if (filesU[i].isDirectory()) { + assertEquals(filesU[i].listFiles(noCrc).length,1); + assertEquals(filesQ[i].listFiles(noCrc).length,1); + copyFiles.remove(filesU[i].listFiles(noCrc)[0].getName()); + } + } + + assertTrue(copyFiles.isEmpty()); + + } + + +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services