zentol closed pull request #2288: Feature/s3 a fix
URL: https://github.com/apache/flink/pull/2288
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/util/FileSystemCopyUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/util/FileSystemCopyUtils.java
new file mode 100644
index 00000000000..06c3b14679e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/util/FileSystemCopyUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.fs.util;
+
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ *  This is a utility class to impose recursive copy from local directory when 
using {@link FileSystem} implementations
+ *  as we cannot assume that all FileSystem implementations will implement a 
recursive copy.
+ *  Presently,  <a "href"="https://github.com/Aloisius/hadoop-s3a";>S3a</a>does 
not.
+ */
+public class FileSystemCopyUtils {
+
+       /**
+        * Recursive copy to work around FileSystem implementations that do not 
implement it.
+        * @param fs
+        * @param localPath
+        * @param remotePath
+        * @throws IOException
+        */
+       public static void copyFromLocalFile(FileSystem fs, boolean overwrite, 
Path localPath, Path remotePath) throws IOException {
+               localPath = checkFileScheme(localPath);
+               File localFile = new File(localPath.toUri());
+               if (localFile.isDirectory()) {
+                       for (File file : localFile.listFiles()) {
+                               copyFromLocalFile(fs, overwrite, new 
Path("file://" + file.getAbsolutePath()), new Path(remotePath,file.getName()));
+                       }
+               } else {
+                       fs.copyFromLocalFile(false, overwrite, 
localPath,remotePath);
+               }
+       }
+
+       /**
+        * All paths
+        * @param localRsrcPath
+        * @return
+        * @throws IOException
+        */
+       private static Path checkFileScheme(Path localRsrcPath) throws 
IOException {
+               if (localRsrcPath.isAbsoluteAndSchemeAuthorityNull()) {
+                       try {
+                               return new Path(new URI("file://" + 
localRsrcPath.toString()));
+                       } catch (URISyntaxException e) {
+                               throw new IOException(e);
+                       }
+               }
+               return localRsrcPath;
+       }
+
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
index 37dc2c703c3..427763841e9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
@@ -24,8 +24,11 @@
 import org.apache.hadoop.fs.Path;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 
+import static 
org.apache.flink.runtime.fs.util.FileSystemCopyUtils.copyFromLocalFile;
+
 /**
  * Utility for copying from local file system to a HDFS {@link FileSystem}.
  */
@@ -46,8 +49,8 @@ public void run() {
 
                                        FileSystem fs = 
FileSystem.get(remotePath, hadoopConf);
 
-                                       fs.copyFromLocalFile(new 
Path(localPath.getAbsolutePath()),
-                                                       new Path(remotePath));
+                                       copyFromLocalFile(fs, true, new 
Path("file://" + localPath.getAbsolutePath()), new 
Path(checkInitialDirectory(fs,localPath,remotePath)));
+
                                } catch (Exception t) {
                                        asyncException.f0 = t;
                                }
@@ -62,4 +65,18 @@ public void run() {
                        throw asyncException.f0;
                }
        }
+
+       /**
+        * Ensure that target path terminates with a new directory to be 
created by fs. If remoteURI does not specify a new
+        * directory, append local directory name.
+        * @param fs
+        * @param localPath
+        * @param remoteURI
+        * @return
+        * @throws IOException
+        */
+       private static URI checkInitialDirectory(FileSystem fs, File localPath, 
URI remoteURI) throws IOException {
+               return remoteURI;
+       }
+
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
index f16750d8f18..0b67abfdf7f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
@@ -30,8 +30,14 @@
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 public class HDFSCopyUtilitiesTest {
 
@@ -70,6 +76,86 @@ public void testCopyFromLocal() throws Exception {
                }
        }
 
+
+       /**
+        * This test verifies that nested directories are properly copied.
+        */
+       @Test
+       public void testCopyFromLocalRecursive() throws Exception {
+
+               File rootDir = tempFolder.newFolder();
+               File nestedDir = new File(rootDir,"nested");
+               nestedDir.mkdir();
+
+               Map<String,File>  copyFiles = new HashMap<String,File>();
+
+               copyFiles.put("1",new File(rootDir, "1"));
+               copyFiles.put("2",new File(rootDir, "2"));
+               copyFiles.put("3",new File(nestedDir, "3"));
+
+               for (File file : copyFiles.values()) {
+                       try (DataOutputStream out = new DataOutputStream(new 
FileOutputStream(file))) {
+                               out.writeUTF("Hello there, " + file.getName());
+                       }
+               }
+               //add root and nested dirs to expected output
+               copyFiles.put(rootDir.getName(),rootDir);
+               copyFiles.put("nested",nestedDir);
+
+               assertEquals(5,copyFiles.size());
+
+               //Test for copy to unspecified target directory
+               File copyDirU = tempFolder.newFolder();
+               HDFSCopyFromLocal.copyFromLocal(
+                               rootDir,
+                               new Path("file://" + 
copyDirU.getAbsolutePath()).toUri());
+
+               //Test for copy to specified target directory
+               File copyDirQ = tempFolder.newFolder();
+               HDFSCopyFromLocal.copyFromLocal(
+                               rootDir,
+                               new Path("file://" + copyDirQ.getAbsolutePath() 
+ "/" + rootDir.getName()).toUri());
+
+               //We only want to verify intended files, not CRC shadow files.
+               FilenameFilter noCrc = new FilenameFilter() {
+                       @Override
+                       public boolean accept(File dir, String name) {
+                               return !name.endsWith(".crc");
+                       }
+               };
+
+               File dirCopyU = copyDirU.listFiles(noCrc)[0];
+               File dirCopyQ = copyDirQ.listFiles(noCrc)[0];
+
+               assertEquals(dirCopyU.getName(),dirCopyQ.getName());
+
+               assertEquals(rootDir.getName(),dirCopyU.getName());
+               assertNotNull(copyFiles.remove(dirCopyU.getName()));
+
+               File[] filesU = dirCopyU.listFiles(noCrc);
+               File[] filesQ = dirCopyQ.listFiles(noCrc);
+
+               assertEquals(filesU.length, 3);
+               assertEquals(filesU.length, filesQ.length);
+
+               Arrays.sort(filesU);
+               Arrays.sort(filesQ);
+
+               for (int i = 0; i < filesU.length; i++) {
+                       assertEquals(filesU[i].getName(), filesQ[i].getName());
+                       copyFiles.remove(filesU[i].getName());
+                       if (filesU[i].isDirectory()) {
+                               
assertEquals(filesU[i].listFiles(noCrc).length,1);
+                               
assertEquals(filesQ[i].listFiles(noCrc).length,1);
+                               
copyFiles.remove(filesU[i].listFiles(noCrc)[0].getName());
+                       }
+               }
+
+               assertTrue(copyFiles.isEmpty());
+
+       }
+
+
        /**
         * This test verifies that a hadoop configuration is correctly read in 
the external
         * process copying tools.
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 1496d61c90e..a9b74b4b34f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -26,6 +26,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.net.URI;
+import java.net.URISyntaxException;
 
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.slf4j.Logger;
@@ -60,6 +62,14 @@
        
        private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
 
+       /** Keytab file name populated in YARN container */
+       public static final String KEYTAB_FILE_NAME = "krb5.keytab";
+
+       /** KRB5 file name populated in YARN container for secure IT run */
+       public static final String KRB5_FILE_NAME = "krb5.conf";
+
+       /** Yarn site xml file name populated in YARN container for secure IT 
run */
+       public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
 
        /**
         * See documentation
@@ -124,9 +134,9 @@ public static Path setupLocalResource(
                String suffix = ".flink/" + appId + "/" + 
localRsrcPath.getName();
 
                Path dst = new Path(homedir, suffix);
-
+               localRsrcPath = checkScheme(localRsrcPath);
                LOG.info("Copying from " + localRsrcPath + " to " + dst);
-               fs.copyFromLocalFile(localRsrcPath, dst);
+               copyFromLocalFile(fs,localRsrcPath, dst);
                registerLocalResource(fs, dst, appMasterJar);
                return dst;
        }
@@ -256,4 +266,34 @@ private Utils() {
                }
                return result;
        }
+
+       /**
+        * Recursive copy to work around FileSystem implementations that do not 
implement it.
+        * @param fs
+        * @param localPath
+        * @param remotePath
+        * @throws IOException
+        */
+       public static void copyFromLocalFile(FileSystem fs, Path localPath, 
Path remotePath) throws IOException {
+               File localFile = new File(localPath.toUri());
+               if (localFile.isDirectory()) {
+                       for (File file : localFile.listFiles()) {
+                               copyFromLocalFile(fs, new Path("file://" + 
file.getAbsolutePath()), new Path(remotePath,file.getName()));
+                       }
+               } else {
+                       fs.copyFromLocalFile(false, true, localPath,remotePath);
+               }
+       }
+
+
+       private static Path checkScheme(Path localRsrcPath) throws IOException {
+               if (localRsrcPath.isAbsoluteAndSchemeAuthorityNull()) {
+                       try {
+                               return new Path(new URI("file://" + 
localRsrcPath.toString()));
+                       } catch (URISyntaxException e) {
+                               throw new IOException(e);
+                       }
+               }
+               return localRsrcPath;
+       }
 }
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
new file mode 100644
index 00000000000..8de3f3ef230
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.streaming.util.HDFSCopyFromLocal;
+import org.apache.flink.streaming.util.HDFSCopyToLocal;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class YarnFileStageTest {
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void checkOperatingSystem() {
+               Assume.assumeTrue("This test can't run successfully on 
Windows.", !OperatingSystem.isWindows());
+       }
+
+       /**
+        * This test verifies that nested directories are properly copied.
+        */
+       @Test
+       public void testCopyFromLocalRecursive() throws Exception {
+
+               FileSystem fs = 
FileSystem.get(HadoopFileSystem.getHadoopConfiguration());
+               File rootDir = tempFolder.newFolder();
+               File nestedDir = new File(rootDir,"nested");
+               nestedDir.mkdir();
+
+               Map<String,File>  copyFiles = new HashMap<String,File>();
+
+               copyFiles.put("1",new File(rootDir, "1"));
+               copyFiles.put("2",new File(rootDir, "2"));
+               copyFiles.put("3",new File(nestedDir, "3"));
+
+               for (File file : copyFiles.values()) {
+                       try (DataOutputStream out = new DataOutputStream(new 
FileOutputStream(file))) {
+                               out.writeUTF("Hello there, " + file.getName());
+                       }
+               }
+               //add root and nested dirs to expected output
+               copyFiles.put(rootDir.getName(),rootDir);
+               copyFiles.put("nested",nestedDir);
+
+               assertEquals(5,copyFiles.size());
+
+               //Test for copy to unspecified target directory
+               File copyDirU = tempFolder.newFolder();
+               HDFSCopyFromLocal.copyFromLocal(
+                               rootDir,
+                               new Path("file://" + 
copyDirU.getAbsolutePath()).toUri());
+
+               //Test for copy to specified target directory
+               File copyDirQ = tempFolder.newFolder();
+               HDFSCopyFromLocal.copyFromLocal(
+                               rootDir,
+                               new Path("file://" + copyDirQ.getAbsolutePath() 
+ "/" + rootDir.getName()).toUri());
+
+               //We only want to verify intended files, not CRC shadow files.
+               FilenameFilter noCrc = new FilenameFilter() {
+                       @Override
+                       public boolean accept(File dir, String name) {
+                               return !name.endsWith(".crc");
+                       }
+               };
+
+               File dirCopyU = copyDirU.listFiles(noCrc)[0];
+               File dirCopyQ = copyDirQ.listFiles(noCrc)[0];
+
+               assertEquals(dirCopyU.getName(),dirCopyQ.getName());
+
+               assertEquals(rootDir.getName(),dirCopyU.getName());
+               assertNotNull(copyFiles.remove(dirCopyU.getName()));
+
+               File[] filesU = dirCopyU.listFiles(noCrc);
+               File[] filesQ = dirCopyQ.listFiles(noCrc);
+
+               assertEquals(filesU.length, 3);
+               assertEquals(filesU.length, filesQ.length);
+
+               Arrays.sort(filesU);
+               Arrays.sort(filesQ);
+
+               for (int i = 0; i < filesU.length; i++) {
+                       assertEquals(filesU[i].getName(), filesQ[i].getName());
+                       copyFiles.remove(filesU[i].getName());
+                       if (filesU[i].isDirectory()) {
+                               
assertEquals(filesU[i].listFiles(noCrc).length,1);
+                               
assertEquals(filesQ[i].listFiles(noCrc).length,1);
+                               
copyFiles.remove(filesU[i].listFiles(noCrc)[0].getName());
+                       }
+               }
+
+               assertTrue(copyFiles.isEmpty());
+
+       }
+
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to